[GitHub] [kafka] vamossagar12 commented on pull request #13733: KAFKA-13337: fix of possible java.nio.file.AccessDeniedException during Connect plugin directory scan
vamossagar12 commented on PR #13733: URL: https://github.com/apache/kafka/pull/13733#issuecomment-1560458136 Thanks @akatona84 for the clarification. If I go by what @gharris1727 is saying above, i.e > When a directory is not accessible, only plugins within that directory should be missing. Plugins from other directories should still be visible if possible. then the changes that you have made in the PR become relevant. The changes look fine to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1181520975 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -581,11 +588,18 @@ public void run() { if (isLeader()) { // Copy log segments to remote storage copyLogSegmentsToRemote(); +// Cleanup/delete expired remote log segments +cleanupExpiredRemoteLogSegments(); Review Comment: This is different from local log deletion. It requires the deletion of segments from local storage which need to really delete the files. But incase of remote storages, it does not wait for the data to be deleted but it marks the file or object for deletion in their respective metadata stores. Respective garbage collectors in those storages will take care of deleting the data asynchronously. There is no perf impact for these delete calls as they take a much shorter time than copying segments. It is very unlikely that copying segments get affected because of the deletion of segments. Deletion checks are happening in every iteration so there will not be many segments that need to be deleted. Anyways, we can discuss this separately in a separate JIRA. On another note, all this logic will go to UnifiedLog in future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14836) Fix UtilsTest#testToLogDateTimeFormat failure in some cases
[ https://issues.apache.org/jira/browse/KAFKA-14836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14836: -- Fix Version/s: 3.4.1 > Fix UtilsTest#testToLogDateTimeFormat failure in some cases > --- > > Key: KAFKA-14836 > URL: https://issues.apache.org/jira/browse/KAFKA-14836 > Project: Kafka > Issue Type: Bug >Reporter: Tamas Barnabas Egyed >Assignee: Tamas Barnabas Egyed >Priority: Major > Fix For: 3.5.0, 3.4.1 > > > > {code:java} > org.apache.kafka.common.utils.UtilsTest.testToLogDateTimeFormat {code} > test is failing in some cases. It uses hard coded datetime ({*}2020-11-09 > 12:34:05{*}), while the assertation expected part contains the offset of the > current time ({*}Instant.now(){*}). This can lead to problems if there is a > summer and winter time for the specific location. We should use the same > datetime in the offset query instead of the current time. > {noformat} > org.opentest4j.AssertionFailedError: expected: <2020-11-09 12:34:05,123 > -07:00> but was: <2020-11-09 12:34:05,123 -08:00>{noformat} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13744: MINOR: update 3.4.1 licence
showuon merged PR #13744: URL: https://github.com/apache/kafka/pull/13744 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13745: KAFKA-15015: Explicit on reload4j version
showuon commented on PR #13745: URL: https://github.com/apache/kafka/pull/13745#issuecomment-1560339089 Backported to 3.4 and 3.5 branch. @mimaison , if you have another RC build, this duplicated reload4j binary issue will be fixed. FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15015) Binaries contain 2 versions of reload4j
[ https://issues.apache.org/jira/browse/KAFKA-15015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-15015. --- Fix Version/s: 3.5.0 3.4.1 Resolution: Fixed > Binaries contain 2 versions of reload4j > --- > > Key: KAFKA-15015 > URL: https://issues.apache.org/jira/browse/KAFKA-15015 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.4.1 >Reporter: Mickael Maison >Assignee: Atul Sharma >Priority: Major > Fix For: 3.5.0, 3.4.1 > > > These releases ship 2 versions of reload4j: > - reload4j-1.2.19.jar > - reload4j-1.2.25.jar -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13745: KAFKA-15015: Explicit on reload4j version
showuon merged PR #13745: URL: https://github.com/apache/kafka/pull/13745 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13745: KAFKA-15015: Explicit on reload4j version
showuon commented on PR #13745: URL: https://github.com/apache/kafka/pull/13745#issuecomment-1560336419 Failed tests also failed in trunk build ``` Build / JDK 17 and Scala 2.13 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch() Build / JDK 17 and Scala 2.13 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch() Build / JDK 17 and Scala 2.13 / kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch() Build / JDK 17 and Scala 2.13 / kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch() Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 11 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() Build / JDK 11 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testSubscribeWhenTopicUnavailable() Build / JDK 11 and Scala 2.13 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch() Build / JDK 11 and Scala 2.13 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch() Build / JDK 11 and Scala 2.13 / kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch() Build / JDK 11 and Scala 2.13 / kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch() Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread() Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 8 and Scala 2.12 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch() Build / JDK 8 and Scala 2.12 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch() Build / JDK 8 and Scala 2.12 / kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch() Build / JDK 8 and Scala 2.12 / kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203225900 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203224577 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203222949 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static
[GitHub] [kafka] junrao commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
junrao commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1203191600 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -522,7 +522,12 @@ class BrokerServer( } Some(new RemoteLogManager(remoteLogManagerConfig, config.brokerId, config.logDirs.head, time, -(tp: TopicPartition) => logManager.getLog(tp).asJava)); +(tp: TopicPartition) => logManager.getLog(tp).asJava, +(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { + logManager.getLog(tp).foreach(log => { +log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset) + }) +})); Review Comment: unnecessary semicolon. Ditto in KafkaServer. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203192959 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2080 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203184877 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; + +/** + * A Consumer Group. All the metadata in this class are backed by + * records in the __consumer_offsets partitions. + */ +public class ConsumerGroup implements Group { + +public enum ConsumerGroupState { +EMPTY("empty"), +ASSIGNING("assigning"), +RECONCILING("reconciling"), +STABLE("stable"), +DEAD("dead"); + +private final String name; + +ConsumerGroupState(String name) { +this.name = name; +} + +@Override +public String toString() { +return name; +} +} + +/** + * The snapshot registry. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The group id. + */ +private final String groupId; + +/** + * The group state. + */ +private final TimelineObject state; + +/** + * The group epoch. The epoch is incremented whenever the subscriptions + * are updated and it will trigger the computation of a new assignment + * for the group. + */ +private final TimelineInteger groupEpoch; + +/** + * The group members. + */ +private final TimelineHashMap members; + +/** + * The metadata of the subscribed topics. + */ +private final TimelineHashMap subscribedTopicMetadata; + +/** + * The assignment epoch. An assignment epoch smaller than the group epoch means + * that a new assignment is required. The assignment epoch is updated when a new + * assignment is installed. + */ +private final TimelineInteger assignmentEpoch; + +/** + * The target assignment. + */ +private final TimelineHashMap assignments; + +/** + * The current partition epoch maps each topic-partitions to their current epoch where + * the epoch is the epoch of their owners. When a member revokes a partition, it removes + * itself from this map. When a member gets a partition, it adds itself to this map. + */ +private final TimelineHashMap> currentPartitionEpoch; + +public ConsumerGroup( +SnapshotRegistry snapshotRegistry, +String groupId +) { +this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); +this.groupId = Objects.requireNonNull(groupId); +this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY); +this.groupEpoch = new TimelineInteger(snapshotRegistry); +this.members = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); +this.assignmentEpoch = new TimelineInteger(snapshotRegistry); +this.assignments = new TimelineHashMap<>(snapshotRegistry, 0); +this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * The type of this group. + * + * @return The group type (Consumer). + */ +@Override +public GroupType type() { +return GroupType.CONSUMER; +} + +/** + * The state of this group. + * + * @return The current state as a String. + */ +@Override +public String stateAsString() { +return
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203181594 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; + +/** + * A Consumer Group. All the metadata in this class are backed by + * records in the __consumer_offsets partitions. + */ +public class ConsumerGroup implements Group { + +public enum ConsumerGroupState { +EMPTY("empty"), +ASSIGNING("assigning"), +RECONCILING("reconciling"), +STABLE("stable"), +DEAD("dead"); + +private final String name; + +ConsumerGroupState(String name) { +this.name = name; +} + +@Override +public String toString() { +return name; +} +} + +/** + * The snapshot registry. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The group id. + */ +private final String groupId; + +/** + * The group state. + */ +private final TimelineObject state; + +/** + * The group epoch. The epoch is incremented whenever the subscriptions + * are updated and it will trigger the computation of a new assignment + * for the group. + */ +private final TimelineInteger groupEpoch; + +/** + * The group members. + */ +private final TimelineHashMap members; + +/** + * The metadata of the subscribed topics. + */ +private final TimelineHashMap subscribedTopicMetadata; + +/** + * The assignment epoch. An assignment epoch smaller than the group epoch means + * that a new assignment is required. The assignment epoch is updated when a new + * assignment is installed. + */ +private final TimelineInteger assignmentEpoch; + +/** + * The target assignment. + */ +private final TimelineHashMap assignments; + +/** + * The current partition epoch maps each topic-partitions to their current epoch where + * the epoch is the epoch of their owners. When a member revokes a partition, it removes + * itself from this map. When a member gets a partition, it adds itself to this map. + */ +private final TimelineHashMap> currentPartitionEpoch; + +public ConsumerGroup( +SnapshotRegistry snapshotRegistry, +String groupId +) { +this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); +this.groupId = Objects.requireNonNull(groupId); +this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY); +this.groupEpoch = new TimelineInteger(snapshotRegistry); +this.members = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); +this.assignmentEpoch = new TimelineInteger(snapshotRegistry); +this.assignments = new TimelineHashMap<>(snapshotRegistry, 0); +this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * The type of this group. + * + * @return The group type (Consumer). + */ +@Override +public GroupType type() { +return GroupType.CONSUMER; +} + +/** + * The state of this group. + * + * @return The current state as a String. + */ +@Override +public String stateAsString() { +return
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203158830 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -0,0 +1,635 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * A Consumer Group. All the metadata in this class are backed by + * records in the __consumer_offsets partitions. + */ +public class ConsumerGroup implements Group { + +public enum ConsumerGroupState { +EMPTY("empty"), +ASSIGNING("assigning"), +RECONCILING("reconciling"), +STABLE("stable"), +DEAD("dead"); + +private final String name; + +ConsumerGroupState(String name) { +this.name = name; +} + +@Override +public String toString() { +return name; +} +} + +/** + * The snapshot registry. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The group id. + */ +private final String groupId; + +/** + * The group state. + */ +private final TimelineObject state; + +/** + * The group epoch. The epoch is incremented whenever the subscriptions + * are updated and it will trigger the computation of a new assignment + * for the group. + */ +private final TimelineInteger groupEpoch; + +/** + * The group members. + */ +private final TimelineHashMap members; + +/** + * The number of members per server assignor name. + */ +private final TimelineHashMap serverAssignors; + +/** + * The number of subscribers per topic. + */ +private final TimelineHashMap subscribedTopicNames; + +/** + * The metadata of the subscribed topics. + */ +private final TimelineHashMap subscribedTopicMetadata; + +/** + * The assignment epoch. An assignment epoch smaller than the group epoch means + * that a new assignment is required. The assignment epoch is updated when a new + * assignment is installed. + */ +private final TimelineInteger assignmentEpoch; + +/** + * The target assignment. + */ +private final TimelineHashMap assignments; + +/** + * The current partition epoch maps each topic-partitions to their current epoch where + * the epoch is the epoch of their owners. When a member revokes a partition, it removes + * itself from this map. When a member gets a partition, it adds itself to this map. + */ +private final TimelineHashMap> currentPartitionEpoch; + +public ConsumerGroup( +SnapshotRegistry snapshotRegistry, +String groupId +) { +this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); +this.groupId = Objects.requireNonNull(groupId); +this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY); +this.groupEpoch = new TimelineInteger(snapshotRegistry); +this.members = new TimelineHashMap<>(snapshotRegistry, 0); +this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); +this.assignmentEpoch = new TimelineInteger(snapshotRegistry); +this.assignments = new TimelineHashMap<>(snapshotRegistry, 0); +this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * The type of
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203152667 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -0,0 +1,635 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * A Consumer Group. All the metadata in this class are backed by + * records in the __consumer_offsets partitions. + */ +public class ConsumerGroup implements Group { + +public enum ConsumerGroupState { +EMPTY("empty"), +ASSIGNING("assigning"), +RECONCILING("reconciling"), +STABLE("stable"), +DEAD("dead"); + +private final String name; + +ConsumerGroupState(String name) { +this.name = name; +} + +@Override +public String toString() { +return name; +} +} + +/** + * The snapshot registry. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The group id. + */ +private final String groupId; + +/** + * The group state. + */ +private final TimelineObject state; + +/** + * The group epoch. The epoch is incremented whenever the subscriptions + * are updated and it will trigger the computation of a new assignment + * for the group. + */ +private final TimelineInteger groupEpoch; + +/** + * The group members. + */ +private final TimelineHashMap members; + +/** + * The number of members per server assignor name. + */ +private final TimelineHashMap serverAssignors; + +/** + * The number of subscribers per topic. + */ +private final TimelineHashMap subscribedTopicNames; + +/** + * The metadata of the subscribed topics. + */ +private final TimelineHashMap subscribedTopicMetadata; + +/** + * The assignment epoch. An assignment epoch smaller than the group epoch means + * that a new assignment is required. The assignment epoch is updated when a new + * assignment is installed. + */ +private final TimelineInteger assignmentEpoch; + +/** + * The target assignment. + */ +private final TimelineHashMap assignments; + +/** + * The current partition epoch maps each topic-partitions to their current epoch where + * the epoch is the epoch of their owners. When a member revokes a partition, it removes + * itself from this map. When a member gets a partition, it adds itself to this map. + */ +private final TimelineHashMap> currentPartitionEpoch; + +public ConsumerGroup( +SnapshotRegistry snapshotRegistry, +String groupId +) { +this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); +this.groupId = Objects.requireNonNull(groupId); +this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY); +this.groupEpoch = new TimelineInteger(snapshotRegistry); +this.members = new TimelineHashMap<>(snapshotRegistry, 0); +this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); +this.assignmentEpoch = new TimelineInteger(snapshotRegistry); +this.assignments = new TimelineHashMap<>(snapshotRegistry, 0); +this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * The type of
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203152667 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -0,0 +1,635 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * A Consumer Group. All the metadata in this class are backed by + * records in the __consumer_offsets partitions. + */ +public class ConsumerGroup implements Group { + +public enum ConsumerGroupState { +EMPTY("empty"), +ASSIGNING("assigning"), +RECONCILING("reconciling"), +STABLE("stable"), +DEAD("dead"); + +private final String name; + +ConsumerGroupState(String name) { +this.name = name; +} + +@Override +public String toString() { +return name; +} +} + +/** + * The snapshot registry. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The group id. + */ +private final String groupId; + +/** + * The group state. + */ +private final TimelineObject state; + +/** + * The group epoch. The epoch is incremented whenever the subscriptions + * are updated and it will trigger the computation of a new assignment + * for the group. + */ +private final TimelineInteger groupEpoch; + +/** + * The group members. + */ +private final TimelineHashMap members; + +/** + * The number of members per server assignor name. + */ +private final TimelineHashMap serverAssignors; + +/** + * The number of subscribers per topic. + */ +private final TimelineHashMap subscribedTopicNames; + +/** + * The metadata of the subscribed topics. + */ +private final TimelineHashMap subscribedTopicMetadata; + +/** + * The assignment epoch. An assignment epoch smaller than the group epoch means + * that a new assignment is required. The assignment epoch is updated when a new + * assignment is installed. + */ +private final TimelineInteger assignmentEpoch; + +/** + * The target assignment. + */ +private final TimelineHashMap assignments; + +/** + * The current partition epoch maps each topic-partitions to their current epoch where + * the epoch is the epoch of their owners. When a member revokes a partition, it removes + * itself from this map. When a member gets a partition, it adds itself to this map. + */ +private final TimelineHashMap> currentPartitionEpoch; + +public ConsumerGroup( +SnapshotRegistry snapshotRegistry, +String groupId +) { +this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); +this.groupId = Objects.requireNonNull(groupId); +this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY); +this.groupEpoch = new TimelineInteger(snapshotRegistry); +this.members = new TimelineHashMap<>(snapshotRegistry, 0); +this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); +this.assignmentEpoch = new TimelineInteger(snapshotRegistry); +this.assignments = new TimelineHashMap<>(snapshotRegistry, 0); +this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * The type of
[GitHub] [kafka] danicafine opened a new pull request, #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
danicafine opened a new pull request, #13751: URL: https://github.com/apache/kafka/pull/13751 Replace usage of Cluster in StreamsMetadataState with Map>. Update StreamsPartitionAssignor#onAssignment method to pass existing Map instead of fake Cluster object. Behavior remains the same; updated existing unit tests accordingly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203150079 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; + +/** + * A Consumer Group. All the metadata in this class are backed by + * records in the __consumer_offsets partitions. + */ +public class ConsumerGroup implements Group { + +public enum ConsumerGroupState { +EMPTY("empty"), +ASSIGNING("assigning"), +RECONCILING("reconciling"), +STABLE("stable"), +DEAD("dead"); + +private final String name; + +ConsumerGroupState(String name) { +this.name = name; +} + +@Override +public String toString() { +return name; +} +} + +/** + * The snapshot registry. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The group id. + */ +private final String groupId; + +/** + * The group state. + */ +private final TimelineObject state; + +/** + * The group epoch. The epoch is incremented whenever the subscriptions + * are updated and it will trigger the computation of a new assignment + * for the group. + */ +private final TimelineInteger groupEpoch; + +/** + * The group members. + */ +private final TimelineHashMap members; + +/** + * The metadata of the subscribed topics. + */ +private final TimelineHashMap subscribedTopicMetadata; + +/** + * The assignment epoch. An assignment epoch smaller than the group epoch means + * that a new assignment is required. The assignment epoch is updated when a new + * assignment is installed. + */ +private final TimelineInteger assignmentEpoch; + +/** + * The target assignment. + */ +private final TimelineHashMap assignments; + +/** + * The current partition epoch maps each topic-partitions to their current epoch where + * the epoch is the epoch of their owners. When a member revokes a partition, it removes + * itself from this map. When a member gets a partition, it adds itself to this map. + */ +private final TimelineHashMap> currentPartitionEpoch; + +public ConsumerGroup( +SnapshotRegistry snapshotRegistry, +String groupId +) { +this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); +this.groupId = Objects.requireNonNull(groupId); +this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY); +this.groupEpoch = new TimelineInteger(snapshotRegistry); +this.members = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); +this.assignmentEpoch = new TimelineInteger(snapshotRegistry); +this.assignments = new TimelineHashMap<>(snapshotRegistry, 0); +this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * The type of this group. + * + * @return The group type (Consumer). + */ +@Override +public GroupType type() { +return GroupType.CONSUMER; +} + +/** + * The state of this group. + * + * @return The current state as a String. + */ +@Override +public String stateAsString() { +return
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader
gharris1727 commented on code in PR #13165: URL: https://github.com/apache/kafka/pull/13165#discussion_r1203138122 ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java: ## @@ -119,36 +120,37 @@ public Connect startConnect(Map workerProps, String... extraArgs log.info("Scanning for plugin classes. This might take a moment ..."); Plugins plugins = new Plugins(workerProps); -plugins.compareAndSwapWithDelegatingLoader(); -T config = createConfig(workerProps); -log.debug("Kafka cluster ID: {}", config.kafkaClusterId()); +try (LoaderSwap loaderSwap = plugins.withClassLoader(plugins.delegatingLoader())) { Review Comment: Also here is some of the context for this change: https://github.com/apache/kafka/pull/13165#discussion_r1161929533 Since the elimination of compareAndSwap is technically unrelated to the title change, it could be moved out to it's own PR. Let me know if you'd like me to separate the two changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203132909 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] gharris1727 commented on pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader
gharris1727 commented on PR #13165: URL: https://github.com/apache/kafka/pull/13165#issuecomment-1560240995 > I'm wondering if we can get better coverage for DelegatingClassLoader::scanPluginPath. Right now we verify in PluginsTest::newConnectorShouldInstantiateWithPluginClassLoader that if we've initialized a Plugins instance, and we invoke Plugins::newConnector, the constructor for that connector is called with the correct context classloader. But it seems like this isn't very powerful since, if the constructor is invoked multiple times, the last invocation's classloader will be recorded--so in this case, we're really testing Plugins::newConnector and not the instantiations that are performed during plugin discovery. Yeah this is a blind-spot in the existing tests. The "sampling" paradigm requires an instance of the object in order to perform the assertions, and the scanPluginPath implementation throws away the objects that it creates. The test does not and cannot assert that the TCCL is correct for the first version() call, for example. In this specific case the regression test is still sensitive, because the static initialization happens when the plugin constructor is first called (not when the Class object is created). This means that we can assert the TCCL used in the first constructor via the staticClassloader inspection. I think the alternative would involve mocking/spying part of the scanPluginPath (such as versionFor), or keeping track of instantiated objects in SamplingTestPlugins, both of which seem messy, and would make this harder to refactor in the near future. Do you think this should be addressed now, or can it wait until the plugin path scanning refactor is landed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203119594 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203119594 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203119594 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203106357 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203106357 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203084630 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203083017 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203079411 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1203077539 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] clayburn commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights
clayburn commented on PR #13676: URL: https://github.com/apache/kafka/pull/13676#issuecomment-1560105054 I updated this PR to the latest version of Gradle Enterprise Gradle Plugin. Is there anything else I need to do to get this PR integrated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader
gharris1727 commented on code in PR #13165: URL: https://github.com/apache/kafka/pull/13165#discussion_r1202962972 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java: ## @@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable { private final ClassLoader savedLoader; -public LoaderSwap(ClassLoader savedLoader) { +public static LoaderSwap use(ClassLoader loader) { +ClassLoader savedLoader = compareAndSwapLoaders(loader); +try { +return new LoaderSwap(savedLoader); +} catch (Throwable t) { +compareAndSwapLoaders(savedLoader); +throw t; +} +} Review Comment: This is not re-introducing the static logic, it is just refactoring to eliminate the open-ended Plugins.compareAndSwap* methods. This method is only called in two places: by DelegatingClassLoader.scanPluginPath (before scanning is finished) and Plugins.withClassLoader (after scanning is finished). I've dropped the visibility and made the DCL call-site mock-able. ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ## @@ -89,9 +88,6 @@ public class EmbeddedConnectCluster { private final String workerNamePrefix; private final AtomicInteger nextWorkerId = new AtomicInteger(0); private final EmbeddedConnectClusterAssertions assertions; -// we should keep the original class loader and set it back after connector stopped since the connector will change the class loader, -// and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed -private final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); Review Comment: I disagree. I think that this is a symptom of the open-ended context classloader swap having unintended downstream effects. The existing fix is adequate, but is mostly addressing the symptom rather than the problem. ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java: ## @@ -119,36 +120,37 @@ public Connect startConnect(Map workerProps, String... extraArgs log.info("Scanning for plugin classes. This might take a moment ..."); Plugins plugins = new Plugins(workerProps); -plugins.compareAndSwapWithDelegatingLoader(); -T config = createConfig(workerProps); -log.debug("Kafka cluster ID: {}", config.kafkaClusterId()); +try (LoaderSwap loaderSwap = plugins.withClassLoader(plugins.delegatingLoader())) { Review Comment: I understand that this is a change in semantics, but that change is intentional. After this method completes, operations should not require the delegating loader and should be performed via the Connect handle. That handle only has methods for starting, stopping, and interacting with the REST API, all of which should internally handle setting the context classloader when appropriate. The reason that I'm changing this is that I think the open-ended swap methods are an anti-pattern, and lead to unexpected behavior later in the caller thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader
C0urante commented on code in PR #13165: URL: https://github.com/apache/kafka/pull/13165#discussion_r1202903632 ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java: ## @@ -119,36 +120,37 @@ public Connect startConnect(Map workerProps, String... extraArgs log.info("Scanning for plugin classes. This might take a moment ..."); Plugins plugins = new Plugins(workerProps); -plugins.compareAndSwapWithDelegatingLoader(); -T config = createConfig(workerProps); -log.debug("Kafka cluster ID: {}", config.kafkaClusterId()); +try (LoaderSwap loaderSwap = plugins.withClassLoader(plugins.delegatingLoader())) { Review Comment: This is actually incorrect; we want the delegating loader to remain the classloader even after this method exits (normally or exceptionally). ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java: ## @@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable { private final ClassLoader savedLoader; -public LoaderSwap(ClassLoader savedLoader) { +public static LoaderSwap use(ClassLoader loader) { +ClassLoader savedLoader = compareAndSwapLoaders(loader); +try { +return new LoaderSwap(savedLoader); +} catch (Throwable t) { +compareAndSwapLoaders(savedLoader); +throw t; +} +} Review Comment: Adding static logic that invokes `compareAndSwapLoaders` is difficult to test, which was the motivation for KAFKA-14346. Can we try not to re-introduce that kind of static logic? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -360,17 +360,19 @@ private PluginScanResult scanPluginPath( builder.useParallelExecutor(); Reflections reflections = new InternalReflections(builder); -return new PluginScanResult( -getPluginDesc(reflections, SinkConnector.class, loader), -getPluginDesc(reflections, SourceConnector.class, loader), -getPluginDesc(reflections, Converter.class, loader), -getPluginDesc(reflections, HeaderConverter.class, loader), -getTransformationPluginDesc(loader, reflections), -getPredicatePluginDesc(loader, reflections), -getServiceLoaderPluginDesc(ConfigProvider.class, loader), -getServiceLoaderPluginDesc(ConnectRestExtension.class, loader), - getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader) -); +try (LoaderSwap loaderSwap = LoaderSwap.use(loader)) { Review Comment: If static initialization logic for a plugin class changes the context classloader, then that will remain the classloader for the rest of the plugin scanning that takes place in this method. I don't think we have to accommodate this case, but if there's an easy way to, we might try. ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ## @@ -89,9 +88,6 @@ public class EmbeddedConnectCluster { private final String workerNamePrefix; private final AtomicInteger nextWorkerId = new AtomicInteger(0); private final EmbeddedConnectClusterAssertions assertions; -// we should keep the original class loader and set it back after connector stopped since the connector will change the class loader, -// and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed -private final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); Review Comment: I think we need to keep this since the change to `AbstractConnectCli::startConnect` is incorrect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13749: KAFKA-15016: Update LICENSE-binary file
divijvaidya commented on PR #13749: URL: https://github.com/apache/kafka/pull/13749#issuecomment-1559996120 `bcpkix` is still missing https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L61. Do you happen to know why? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1202846585 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1202846284 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1202840230 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1202837929 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13733: KAFKA-13337: fix of possible java.nio.file.AccessDeniedException during Connect plugin directory scan
gharris1727 commented on code in PR #13733: URL: https://github.com/apache/kafka/pull/13733#discussion_r1202806994 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java: ## @@ -22,6 +22,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; Review Comment: There's one more leaked classloader, in testLoadingMixOfValidAndInvalidPlugins if you want to fix that as well. I don't see these warnings as relevant, since this is test code. But I won't stop you from adding the try-with-resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #9910: KAFKA-10877
gharris1727 commented on PR #9910: URL: https://github.com/apache/kafka/pull/9910#issuecomment-1559871005 @smccauliff Are you still interested in making this change? I think this is still affecting users and wasting CPU cycles :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202738629 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } +/** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ +public void alterConnectorOffsets(String connName, Map connectorConfig, + Map, Map> offsets, Callback cb) { +String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); +Connector connector; + +try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { +connector = plugins.newConnector(connectorClassOrAlias); +if (ConnectUtils.isSinkConnector(connector)) { +log.debug("Altering consumer group offsets for sink connector: {}", connName); +alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} else { +log.debug("Altering offsets for source connector: {}", connName); +alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Alter a sink connector's consumer group offsets. + * + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ +void alterSinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { +executor.submit(plugins.withClassLoader(connectorLoader, () -> { +try { +Map parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} + +Class sinkConnectorClass = connector.getClass(); +Map adminConfig = adminConfigs( +connName, +"connector-worker-adminclient-" + connName, +config, +new SinkConnectorConfig(plugins, connectorConfig), +sinkConnectorClass, +connectorClientConfigOverridePolicy, +kafkaClusterId, +ConnectorType.SINK); + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); +String groupId = (String) baseConsumerConfigs( +connName, "connector-consumer-", config, sinkConnectorConfig, +sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + +Map offsetsToAlter = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() != null) +.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue( +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Set partitionsToReset = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() == null) +.map(Map.Entry::getKey) +.collect(Collectors.toSet()); + +KafkaFuture adminFuture = KafkaFuture.completedFuture(null); + +Admin admin = adminFactory.apply(adminConfig); + +try { +
[jira] [Commented] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors
[ https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725502#comment-17725502 ] Chris Egerton commented on KAFKA-15018: --- One possible fix for this could be to preemptively write tombstone offsets to the global offsets topic before writing any offsets to the per-connector offsets topic, and preserve the existing write logic for all non-tombstone offsets. Tombstone offsets should be fairly rare and so in the common case, this will have no impact on connector performance or availability. However, when this case is hit, the proposed fix would require two synchronous writes to topics that are potentially hosted on different clusters. This is not ideal, but it's unclear whether a better alternative exists. > Potential tombstone offsets corruption for exactly-once source connectors > - > > Key: KAFKA-15018 > URL: https://issues.apache.org/jira/browse/KAFKA-15018 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1 >Reporter: Chris Egerton >Priority: Major > > When exactly-once support is enabled for source connectors, source offsets > can potentially be written to two different offsets topics: a topic specific > to the connector, and the global offsets topic (which was used for all > connectors prior to KIP-618 / version 3.3.0). > Precedence is given to offsets in the per-connector offsets topic, but if > none are found for a given partition, then the global offsets topic is used > as a fallback. > When committing offsets, a transaction is used to ensure that source records > and source offsets are written to the Kafka cluster targeted by the source > connector. This transaction only includes the connector-specific offsets > topic. Writes to the global offsets topic take place after writes to the > connector-specific offsets topic have completed successfully, and if they > fail, a warning message is logged, but no other action is taken. > Normally, this ensures that, for offsets committed by exactly-once-supported > source connectors, the per-connector offsets topic is at least as up-to-date > as the global offsets topic, and sometimes even ahead. > However, for tombstone offsets, we lose that guarantee. If a tombstone offset > is successfully written to the per-connector offsets topic, but cannot be > written to the global offsets topic, then the global offsets topic will still > contain that source offset, but the per-connector topic will not. Due to the > fallback-on-global logic used by the worker, if a task requests offsets for > one of the tombstoned partitions, the worker will provide it with the offsets > present in the global offsets topic, instead of indicating to the task that > no offsets can be found. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors
Chris Egerton created KAFKA-15018: - Summary: Potential tombstone offsets corruption for exactly-once source connectors Key: KAFKA-15018 URL: https://issues.apache.org/jira/browse/KAFKA-15018 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.3.2, 3.3.1, 3.4.0, 3.3.0, 3.5.0, 3.4.1 Reporter: Chris Egerton When exactly-once support is enabled for source connectors, source offsets can potentially be written to two different offsets topics: a topic specific to the connector, and the global offsets topic (which was used for all connectors prior to KIP-618 / version 3.3.0). Precedence is given to offsets in the per-connector offsets topic, but if none are found for a given partition, then the global offsets topic is used as a fallback. When committing offsets, a transaction is used to ensure that source records and source offsets are written to the Kafka cluster targeted by the source connector. This transaction only includes the connector-specific offsets topic. Writes to the global offsets topic take place after writes to the connector-specific offsets topic have completed successfully, and if they fail, a warning message is logged, but no other action is taken. Normally, this ensures that, for offsets committed by exactly-once-supported source connectors, the per-connector offsets topic is at least as up-to-date as the global offsets topic, and sometimes even ahead. However, for tombstone offsets, we lose that guarantee. If a tombstone offset is successfully written to the per-connector offsets topic, but cannot be written to the global offsets topic, then the global offsets topic will still contain that source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe closed pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage
cmccabe closed pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage URL: https://github.com/apache/kafka/pull/13686 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202709620 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } +/** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ +public void alterConnectorOffsets(String connName, Map connectorConfig, + Map, Map> offsets, Callback cb) { +String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); +Connector connector; + +try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { +connector = plugins.newConnector(connectorClassOrAlias); +if (ConnectUtils.isSinkConnector(connector)) { +log.debug("Altering consumer group offsets for sink connector: {}", connName); +alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} else { +log.debug("Altering offsets for source connector: {}", connName); +alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Alter a sink connector's consumer group offsets. + * + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ +void alterSinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { +executor.submit(plugins.withClassLoader(connectorLoader, () -> { +try { +Map parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} + +Class sinkConnectorClass = connector.getClass(); +Map adminConfig = adminConfigs( +connName, +"connector-worker-adminclient-" + connName, +config, +new SinkConnectorConfig(plugins, connectorConfig), +sinkConnectorClass, +connectorClientConfigOverridePolicy, +kafkaClusterId, +ConnectorType.SINK); + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); +String groupId = (String) baseConsumerConfigs( +connName, "connector-consumer-", config, sinkConnectorConfig, +sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + +Map offsetsToAlter = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() != null) +.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue( +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Set partitionsToReset = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() == null) +.map(Map.Entry::getKey) +.collect(Collectors.toSet()); + +KafkaFuture adminFuture = KafkaFuture.completedFuture(null); + +Admin admin = adminFactory.apply(adminConfig); + +try { +
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202694716 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1235,6 +1244,246 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } +/** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions (either source partitions for source connectors, or Kafka topic + *partitions for sink connectors) to offsets that need to be written; may not be null or empty + * @param cb callback to invoke upon completion + */ +public void alterConnectorOffsets(String connName, Map connectorConfig, + Map, Map> offsets, Callback cb) { + +if (offsets == null || offsets.isEmpty()) { +throw new ConnectException("The offsets to be altered may not be null or empty"); +} + +String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); +Connector connector; + +try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { +connector = plugins.newConnector(connectorClassOrAlias); +if (ConnectUtils.isSinkConnector(connector)) { +log.debug("Altering consumer group offsets for sink connector: {}", connName); +alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} else { +log.debug("Altering offsets for source connector: {}", connName); +alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Alter a sink connector's consumer group offsets. + * + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be written; may not be null or empty + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ +void alterSinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { +executor.submit(plugins.withClassLoader(connectorLoader, () -> { +try { +Map parsedOffsets = SinkUtils.parseSinkConnectorOffsets(offsets); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); +Class sinkConnectorClass = connector.getClass(); +Map adminConfig = adminConfigs( +connName, +"connector-worker-adminclient-" + connName, +config, +sinkConnectorConfig, +sinkConnectorClass, +connectorClientConfigOverridePolicy, +kafkaClusterId, +ConnectorType.SINK); + +String groupId = (String) baseConsumerConfigs( +connName, "connector-consumer-", config, sinkConnectorConfig, +sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + +Admin admin = adminFactory.apply(adminConfig); + +try { +KafkaFuture adminFuture = KafkaFuture.completedFuture(null); Review Comment: Can we construct a `List>` here and add elements to that as necessary (i.e., depending on whether the sets of to-be-removed and to-be-altered partitions are empty), then combine then with `KafkaFuture adminFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));`? The use of a no-op future is a little hard to read. ##
[GitHub] [kafka] yashmayya opened a new pull request, #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method
yashmayya opened a new pull request, #13750: URL: https://github.com/apache/kafka/pull/13750 - Handle the config topic read timeout edge case in `DistributedHerder::stopConnector` - https://github.com/apache/kafka/pull/13465#discussion_r1200500990 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202684769 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java: ## @@ -52,4 +53,82 @@ public static ConnectorOffsets consumerGroupOffsetsToConnectorOffsets(Map + * { + * "kafka_topic": "topic" + * "kafka_partition": 3 + * } + * + * + * and that the provided offsets (values in the {@code partitionOffsets} map) look like: + * + * { + * "kafka_offset": 1000 + * } + * + * + * This method then parses them into a mapping from {@link TopicPartition}s to their corresponding {@link Long} + * valued offsets. + * + * @param partitionOffsets the partitions to offset map that needs to be validated and parsed. + * @return the parsed mapping from {@link TopicPartition} to its corresponding {@link Long} valued offset. + * + * @throws BadRequestException if the provided offsets aren't in the expected format + */ +public static Map validateAndParseSinkConnectorOffsets(Map, Map> partitionOffsets) { Review Comment: Well done, looks great -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202678391 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback cb) { ); } +@Override +public void alterConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { +log.trace("Submitting alter offsets request for connector '{}'", connName); + +addRequest(() -> { +refreshConfigSnapshot(workerSyncTimeoutMs); +if (!alterConnectorOffsetsChecks(connName, callback)) { +return null; +} +// At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run +// a zombie fencing request +if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) { +log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName); +getFenceZombieSourceTasksCallable(connName, (error, ignored) -> { +if (error != null) { +log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error); +callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", +error), null); +} else { +log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName); +// We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest +// since it is being run in a separate herder request and the conditions could have changed since the +// previous check +addRequest(getAlterConnectorOffsetsCallable(connName, offsets, callback), forwardErrorCallback(callback)); +} +}).call(); +} else { +getAlterConnectorOffsetsCallable(connName, offsets, callback).call(); Review Comment: One convention I've seen used is `foo` (public) and `doFoo` (internal). I'm not a huge fan of it but in some cases it can be useful. Also, are we issuing unnecessary calls to `alterConnectorOffsetsChecks`? We do a check in the beginning of `alterConnectorOffsets` and then, if the connector is a sink (or exactly-once source support is disabled), we immediately invoke the `Callable` returned by `getAlterConnectorOffsetsCallable`, which calls `alterConnectorOffsetsChecks` a second time. We might just get rid of `getAlterConnectorOffsetsCallable` altogether and inline the logic for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202644196 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback cb) { ); } +@Override +public void alterConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { +log.trace("Submitting alter offsets request for connector '{}'", connName); + +addRequest(() -> { +refreshConfigSnapshot(workerSyncTimeoutMs); Review Comment: Good point--yes, we should handle that in `stopConnector` as well (although we don't have to do that in this PR). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] KarboniteKream commented on pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
KarboniteKream commented on PR #13679: URL: https://github.com/apache/kafka/pull/13679#issuecomment-1559681643 This PR seems to have introduced a regression (confirmed using bisect). In a simple setup of two controllers using `config/kraft/controller.properties`, after the leader is shut down and restarted, `UNKNOWN_SERVER_EXCEPTION` will be thrown by `ApiVersionsRequest`. The other controller sees the following exception: ``` [2023-05-19 15:50:18,834] WARN [QuorumController id=0] getFinalizedFeatures: failed with unknown server exception RuntimeException in 28 us. The controller is already in standby mode. (org.apache.kafka.controller.QuorumController) java.lang.RuntimeException: No in-memory snapshot for epoch 84310. Snapshot epochs are: 61900 at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:173) at org.apache.kafka.timeline.SnapshotRegistry.iterator(SnapshotRegistry.java:131) at org.apache.kafka.timeline.TimelineObject.get(TimelineObject.java:69) at org.apache.kafka.controller.FeatureControlManager.finalizedFeatures(FeatureControlManager.java:303) at org.apache.kafka.controller.QuorumController.lambda$finalizedFeatures$16(QuorumController.java:2016) at org.apache.kafka.controller.QuorumController$ControllerReadEvent.run(QuorumController.java:546) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base/java.lang.Thread.run(Thread.java:829) ``` A similar issue was reported in [KAFKA-14996](https://issues.apache.org/jira/browse/KAFKA-14996). I'll report this in Jira once my account is approved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #13749: KAFKA-15016: Update LICENSE-binary file
mimaison opened a new pull request, #13749: URL: https://github.com/apache/kafka/pull/13749 The file was getting a bit out of sync with the actual dependencies we ship. Also the [process we follow each release](https://issues.apache.org/jira/browse/KAFKA-12622) catches missing licenses but does not clear dependencies not used anymore, so this removes a few unnecessary entries too. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #13741: KAFKA-15009: Handle errors while migrating ACL metadata from snapshot to Zk during DUAL_WRITE mode.
mumrah merged PR #13741: URL: https://github.com/apache/kafka/pull/13741 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15009) New ACLs are not written to ZK during migration
[ https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15009: - Component/s: kraft > New ACLs are not written to ZK during migration > --- > > Key: KAFKA-15009 > URL: https://issues.apache.org/jira/browse/KAFKA-15009 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Assignee: Akhilesh Chaganti >Priority: Blocker > Labels: kraft, migration > > While handling snapshots in dual-write mode, we are missing the logic to > detect new ACLs created in KRaft. This means we will not write these new ACLs > back to ZK and they would be missing if a user rolled back their cluster to > ZK mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on pull request #13741: KAFKA-15009: Handle errors while migrating ACL metadata from snapshot to Zk during DUAL_WRITE mode.
mumrah commented on PR #13741: URL: https://github.com/apache/kafka/pull/13741#issuecomment-1559562400 Ok, I split out the client quotas work into KAFKA-15017 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15017) New ClientQuotas are not written to ZK from snapshot
David Arthur created KAFKA-15017: Summary: New ClientQuotas are not written to ZK from snapshot Key: KAFKA-15017 URL: https://issues.apache.org/jira/browse/KAFKA-15017 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.5.0 Reporter: David Arthur Similar issue to KAFKA-15009 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15009) New ACLs are not written to ZK during migration
[ https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15009: - Description: While handling snapshots in dual-write mode, we are missing the logic to detect new ACLs created in KRaft. This means we will not write these new ACLs back to ZK and they would be missing if a user rolled back their cluster to ZK mode. > New ACLs are not written to ZK during migration > --- > > Key: KAFKA-15009 > URL: https://issues.apache.org/jira/browse/KAFKA-15009 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Assignee: Akhilesh Chaganti >Priority: Blocker > Labels: kraft, migration > > While handling snapshots in dual-write mode, we are missing the logic to > detect new ACLs created in KRaft. This means we will not write these new ACLs > back to ZK and they would be missing if a user rolled back their cluster to > ZK mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15009) New ACLs are not written to ZK during migration
[ https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur reassigned KAFKA-15009: Assignee: Akhilesh Chaganti (was: David Arthur) > New ACLs are not written to ZK during migration > --- > > Key: KAFKA-15009 > URL: https://issues.apache.org/jira/browse/KAFKA-15009 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Assignee: Akhilesh Chaganti >Priority: Blocker > Labels: kraft, migration > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15009) New ACLs are not written to ZK during migration
[ https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur reassigned KAFKA-15009: Assignee: David Arthur (was: Akhilesh Chaganti) > New ACLs are not written to ZK during migration > --- > > Key: KAFKA-15009 > URL: https://issues.apache.org/jira/browse/KAFKA-15009 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Assignee: David Arthur >Priority: Blocker > Labels: kraft, migration > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15009) New ACLs are not written to ZK during migration
[ https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15009: - Summary: New ACLs are not written to ZK during migration (was: ClientQuotas and ACLs are not correctly synchronized while handling snapshot during migration (dual write)) > New ACLs are not written to ZK during migration > --- > > Key: KAFKA-15009 > URL: https://issues.apache.org/jira/browse/KAFKA-15009 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Akhilesh Chaganti >Assignee: Akhilesh Chaganti >Priority: Blocker > Labels: kraft, migration > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] krespo opened a new pull request, #13748: [BUGFIX] Bugfixed in KAFKA-8713, but it doesn't work properly.
krespo opened a new pull request, #13748: URL: https://github.com/apache/kafka/pull/13748 Through the ticket ["KAFKA-8713"](https://issues.apache.org/jira/browse/KAFKA-8713), a bug that always outputs "default value" when JsonConverter is a nullable schema was fixed. After applying the bug fix, I set the following settings in kafka connect. ` "value.converter.replace.null.with.default": "false", ` However, I found that the same problem still occurred, so I modified the code to fix this bug. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13747: MINOR: Fix ListOffsetsRequestTest.testResponseIncludesLeaderEpoch
dajac commented on code in PR #13747: URL: https://github.com/apache/kafka/pull/13747#discussion_r1202331207 ## core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala: ## @@ -192,7 +192,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { killBroker(firstLeaderId) val secondLeaderId = TestUtils.awaitLeaderChange(servers, partition, firstLeaderId) // make sure high watermark of new leader has caught up -TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, 0L, -1).errorCode() != Errors.OFFSET_NOT_AVAILABLE.code(), +TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1).errorCode != Errors.OFFSET_NOT_AVAILABLE.code, Review Comment: `OFFSET_NOT_AVAILABLE` is only returned when `LATEST_TIMESTAMP` or `MAX_TIMESTAMP` are used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13747: MINOR: Fix ListOffsetsRequestTest.testResponseIncludesLeaderEpoch
dajac opened a new pull request, #13747: URL: https://github.com/apache/kafka/pull/13747 Fix flaky ListOffsetsRequestTest.testResponseIncludesLeaderEpoch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
dajac commented on PR #13535: URL: https://github.com/apache/kafka/pull/13535#issuecomment-1559262765 @satishd Weird... It fails all the time on my laptop. ``` Gradle Test Run :core:test > Gradle Test Executor 9 > ListOffsetsRequestTest > testResponseIncludesLeaderEpoch() FAILED org.opentest4j.AssertionFailedError: expected: <(10,1,0)> but was: <(-1,-1,78)> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1142) at app//kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:210) ``` ``` % git rev-parse --verify HEAD 15f8705246e094f7825b76a38d9f12f95d626ee5 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sanjanagoyal opened a new pull request, #13746: removed recreation of partition to fix partitions having uneven data disctribution
sanjanagoyal opened a new pull request, #13746: URL: https://github.com/apache/kafka/pull/13746 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] atu-sharm commented on pull request #13745: KAFKA-15015: Explicit on reload4j version
atu-sharm commented on PR #13745: URL: https://github.com/apache/kafka/pull/13745#issuecomment-1559239546 Output: ``` reload4j-1.2.25.jar slf4j-reload4j-1.7.36.jar ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13745: KAFKA-15015: Explicit on reload4j version
showuon commented on PR #13745: URL: https://github.com/apache/kafka/pull/13745#issuecomment-1559220469 @atu-sharm , could you run these commands to show what's the output of the lib folder for reload library? ``` // build the binary artifacts $ ./gradlewAll releaseTarGz // unpack the binary artifact $ tar xf core/build/distributions/kafka_2.13-X.Y.Z.tgz $ cd xf kafka_2.13-X.Y.Z // list the packaged jars // (you can ignore the jars for our own modules, like kafka, kafka-clients, etc.) $ ls libs | grep reload ``` Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1184873142 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -670,6 +875,14 @@ public void close() { } catch (InterruptedException e) { // ignore } +remoteStorageReaderThreadPool.shutdownNow(); +//waits for 2 mins to terminate the current tasks +try { +remoteStorageReaderThreadPool.awaitTermination(2, TimeUnit.MINUTES); Review Comment: It does not require that to be completed in 5 mins. `lifecycleManager.controlledShutdownFuture` is more about processing the controlled shutdown event to the controller for that broker. It will wait for 5 mins before proceeding with other sequence of actions. But that will not get affected because of the code introduced here. Logging subsystem handles unclean shutdown for log segments and it would have been already finished before RemoteLogManager is closed. So, they will not get affected because of this timeout. But we can have a short duration here like 10 secs, we can revisit introducing a config if it is really needed for closing the remote log subsystem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on PR #13535: URL: https://github.com/apache/kafka/pull/13535#issuecomment-1559216261 @dajac It is passed locally on my laptop. ``` Gradle Test Run :core:test > Gradle Test Executor 65 > ListOffsetsRequestTest > testResponseDefaultOffsetAndLeaderEpochForAllVersions() PASSED Gradle Test Run :core:test > Gradle Test Executor 65 > ListOffsetsRequestTest > testListOffsetsMaxTimeStampOldestVersion() PASSED Gradle Test Run :core:test > Gradle Test Executor 65 > ListOffsetsRequestTest > testListOffsetsErrorCodes() PASSED Gradle Test Run :core:test > Gradle Test Executor 65 > ListOffsetsRequestTest > testCurrentEpochValidation() PASSED Gradle Test Run :core:test > Gradle Test Executor 65 > ListOffsetsRequestTest > testResponseIncludesLeaderEpoch() PASSED BUILD SUCCESSFUL in 1m 8s 55 actionable tasks: 6 executed, 49 up-to-date ➜ kafka git:(apache-trunk) date Tue May 23 18:00:01 IST 2023 ➜ kafka git:(apache-trunk) git rev-parse --verify HEAD 15f8705246e094f7825b76a38d9f12f95d626ee5 ➜ kafka git:(apache-trunk) ``` ``` > Task :core:test Gradle Test Run :core:test > Gradle Test Executor 71 > ListOffsetsRequestWithRemoteStoreTest > testResponseDefaultOffsetAndLeaderEpochForAllVersions() PASSED Gradle Test Run :core:test > Gradle Test Executor 71 > ListOffsetsRequestWithRemoteStoreTest > testListOffsetsMaxTimeStampOldestVersion() PASSED Gradle Test Run :core:test > Gradle Test Executor 71 > ListOffsetsRequestWithRemoteStoreTest > testListOffsetsErrorCodes() PASSED Gradle Test Run :core:test > Gradle Test Executor 71 > ListOffsetsRequestWithRemoteStoreTest > testCurrentEpochValidation() PASSED Gradle Test Run :core:test > Gradle Test Executor 71 > ListOffsetsRequestWithRemoteStoreTest > testResponseIncludesLeaderEpoch() PASSED BUILD SUCCESSFUL in 1m 9s 55 actionable tasks: 6 executed, 49 up-to-date ➜ kafka git:(apache-trunk) date Tue May 23 18:05:20 IST 2023 ➜ kafka git:(apache-trunk) git rev-parse --verify HEAD 15f8705246e094f7825b76a38d9f12f95d626ee5 ➜ kafka git:(apache-trunk) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15015) Binaries contain 2 versions of reload4j
[ https://issues.apache.org/jira/browse/KAFKA-15015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Atul Sharma reassigned KAFKA-15015: --- Assignee: Atul Sharma > Binaries contain 2 versions of reload4j > --- > > Key: KAFKA-15015 > URL: https://issues.apache.org/jira/browse/KAFKA-15015 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.4.1 >Reporter: Mickael Maison >Assignee: Atul Sharma >Priority: Major > > These releases ship 2 versions of reload4j: > - reload4j-1.2.19.jar > - reload4j-1.2.25.jar -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] atu-sharm commented on pull request #13745: KAFKA-15015: Explicit on reload4j version
atu-sharm commented on PR #13745: URL: https://github.com/apache/kafka/pull/13745#issuecomment-1559203084 @showuon @mimaison can you review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15015) Binaries contain 2 versions of reload4j
[ https://issues.apache.org/jira/browse/KAFKA-15015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725390#comment-17725390 ] Atul Sharma commented on KAFKA-15015: - [~mimaison] [~showuon] raised PR [GitHub Pull Request #13745|https://github.com/apache/kafka/pull/13745] > Binaries contain 2 versions of reload4j > --- > > Key: KAFKA-15015 > URL: https://issues.apache.org/jira/browse/KAFKA-15015 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.4.1 >Reporter: Mickael Maison >Priority: Major > > These releases ship 2 versions of reload4j: > - reload4j-1.2.19.jar > - reload4j-1.2.25.jar -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] atu-sharm opened a new pull request, #13745: KAFKA-15015: Explicit on reload4j version
atu-sharm opened a new pull request, #13745: URL: https://github.com/apache/kafka/pull/13745 Defining a specific version for reload4j -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
yashmayya commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1200530069 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback cb) { ); } +@Override +public void alterConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { +log.trace("Submitting alter offsets request for connector '{}'", connName); + +addRequest(() -> { +refreshConfigSnapshot(workerSyncTimeoutMs); +if (!alterConnectorOffsetsChecks(connName, callback)) { +return null; +} +// At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run +// a zombie fencing request +if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) { +log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName); +getFenceZombieSourceTasksCallable(connName, (error, ignored) -> { +if (error != null) { +log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error); +callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", +error), null); +} else { +log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName); +// We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest +// since it is being run in a separate herder request and the conditions could have changed since the +// previous check +addRequest(getAlterConnectorOffsetsCallable(connName, offsets, callback), forwardErrorCallback(callback)); +} +}).call(); +} else { +getAlterConnectorOffsetsCallable(connName, offsets, callback).call(); Review Comment: I think one of the main motivations here (and above) was naming We already have `fenceZombieSourceTasks` and `alterConnectorOffsets` interface methods but there was a need to break out some of the logic from both into separate methods for re-use. I'm happy to take any naming suggestions to make them synchronous and wrap them into callables at the relevant call sites. Maybe something like `fenceZombieSourceTasksSync` / `alterConnectorOffsetsSync` or `fenceZombieSourceTasksInternal` / `alterConnectorOffsetsInternal`? I don't particularly like either though, naming is hard... ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback cb) { ); } +@Override +public void alterConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { +log.trace("Submitting alter offsets request for connector '{}'", connName); + +addRequest(() -> { +refreshConfigSnapshot(workerSyncTimeoutMs); Review Comment: Thanks, both points make sense, done. Btw shouldn't we also handle the read timeout case in the [stopConnector](https://github.com/apache/kafka/blob/e96a463561ca8974fca37562b8675ae8ae4aff29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1107) method? The `KafkaConfigBackingStore::putTaskConfigs` method does include a read to the end of the config topic at the end but the subsequently called `KafkaConfigBackingStore::putTargetState` method doesn't. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } +/** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ +public void alterConnectorOffsets(String connName, Map connectorConfig, + Map, Map> offsets, Callback cb) { +String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader =
[GitHub] [kafka] divijvaidya commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0
divijvaidya commented on PR #13662: URL: https://github.com/apache/kafka/pull/13662#issuecomment-1559122983 I found today that there is another change that needs to make it in this PR. We need to update the LICENSE file with the correct version at https://github.com/apache/kafka/blob/trunk/LICENSE-binary#L211 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725357#comment-17725357 ] Matthias J. Sax commented on KAFKA-7497: Seems to be fixed. Cf https://issues.apache.org/jira/browse/KAFKA-14209 > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-7497. Resolution: Fixed > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #13744: MINOR: update 3.4.1 licence
mimaison commented on PR #13744: URL: https://github.com/apache/kafka/pull/13744#issuecomment-1559077621 @divijvaidya For the "audit" I filled https://issues.apache.org/jira/browse/KAFKA-15016 yesterday. The validation we use to check what the LICENSE file should include is documented in https://issues.apache.org/jira/browse/KAFKA-12622 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14173) TopologyTestDriver does not use mock wall clock time when sending test records
[ https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725356#comment-17725356 ] Matthias J. Sax commented on KAFKA-14173: - Just discovering this ticket. I guess you would need to use `TestInputTopic#advanceTime` for this case? Closing the ticket as "no an issue", as the API is there. Feel free to follow up. > TopologyTestDriver does not use mock wall clock time when sending test records > -- > > Key: KAFKA-14173 > URL: https://issues.apache.org/jira/browse/KAFKA-14173 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 2.3.1 >Reporter: Guido Josquin >Priority: Minor > > I am trying to test a stream-stream join with `TopologyTestDriver`. My goal > is to confirm that my topology performs the following left join correctly. > {code:java} > bills > .leftJoin(payments)( > { > case (billValue, null) => billValue > case (billValue, paymentValue) => (billValue.toInt - > paymentValue.toInt).toString > }, > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)) > ) > .to("debt") > {code} > > In other words, if we see a `bill` and a `payment` within 100ms, the payment > should be subtracted from the bill. If we do not see a payment, the debt is > simply the bill. > Here is the test code. > {code:java} > val simpleLeftJoinTopology = new SimpleLeftJoinTopology > val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology) > val serde = Serdes.stringSerde > val bills = driver.createInputTopic("bills", serde.serializer, > serde.serializer) > val payments = driver.createInputTopic("payments", serde.serializer, > serde.serializer) > val debt = driver.createOutputTopic("debt", serde.deserializer, > serde.deserializer) > bills.pipeInput("fred", "100") > bills.pipeInput("george", "20") > payments.pipeInput("fred", "95") > // When in doubt, sleep twice > driver.advanceWallClockTime(Duration.ofMillis(500)) > Thread.sleep(500) > // Send a new record to cause the previous window to be closed > payments.pipeInput("percy", "0") > val keyValues = debt.readKeyValuesToList() > keyValues should contain theSameElementsAs Seq( > // This record is present > new KeyValue[String, String]("fred", "5"), > // This record is missing > new KeyValue[String, String]("george", "20") > ) > {code} > Full code available at [https://github.com/Oduig/kstreams-left-join-example] > Is seems that advancing the wall clock time, sleeping, or sending an extra > record, never triggers the join condition when data only arrives on the left > side. It is possible to circumvent this by passing an explicit event time > with each test record. (See > https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161) > > However, the behavior deviates from a real Kafka broker. With a real broker, > if we do not send an event, it uses the wall clock time of the broker > instead. The behavior under test should be the same: > `driver.advanceWallClockTime` should provide the default time to be used for > `TestTopic.pipeInput`, when no other time is specified. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14173) TopologyTestDriver does not use mock wall clock time when sending test records
[ https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-14173. - Resolution: Not A Problem > TopologyTestDriver does not use mock wall clock time when sending test records > -- > > Key: KAFKA-14173 > URL: https://issues.apache.org/jira/browse/KAFKA-14173 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 2.3.1 >Reporter: Guido Josquin >Priority: Minor > > I am trying to test a stream-stream join with `TopologyTestDriver`. My goal > is to confirm that my topology performs the following left join correctly. > {code:java} > bills > .leftJoin(payments)( > { > case (billValue, null) => billValue > case (billValue, paymentValue) => (billValue.toInt - > paymentValue.toInt).toString > }, > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)) > ) > .to("debt") > {code} > > In other words, if we see a `bill` and a `payment` within 100ms, the payment > should be subtracted from the bill. If we do not see a payment, the debt is > simply the bill. > Here is the test code. > {code:java} > val simpleLeftJoinTopology = new SimpleLeftJoinTopology > val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology) > val serde = Serdes.stringSerde > val bills = driver.createInputTopic("bills", serde.serializer, > serde.serializer) > val payments = driver.createInputTopic("payments", serde.serializer, > serde.serializer) > val debt = driver.createOutputTopic("debt", serde.deserializer, > serde.deserializer) > bills.pipeInput("fred", "100") > bills.pipeInput("george", "20") > payments.pipeInput("fred", "95") > // When in doubt, sleep twice > driver.advanceWallClockTime(Duration.ofMillis(500)) > Thread.sleep(500) > // Send a new record to cause the previous window to be closed > payments.pipeInput("percy", "0") > val keyValues = debt.readKeyValuesToList() > keyValues should contain theSameElementsAs Seq( > // This record is present > new KeyValue[String, String]("fred", "5"), > // This record is missing > new KeyValue[String, String]("george", "20") > ) > {code} > Full code available at [https://github.com/Oduig/kstreams-left-join-example] > Is seems that advancing the wall clock time, sleeping, or sending an extra > record, never triggers the join condition when data only arrives on the left > side. It is possible to circumvent this by passing an explicit event time > with each test record. (See > https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161) > > However, the behavior deviates from a real Kafka broker. With a real broker, > if we do not send an event, it uses the wall clock time of the broker > instead. The behavior under test should be the same: > `driver.advanceWallClockTime` should provide the default time to be used for > `TestTopic.pipeInput`, when no other time is specified. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10575: Labels: kip (was: ) > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Labels: kip > Fix For: 3.5.0 > > > Part of KIP-869: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility] > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10575: Description: Part of KIP-869: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility] Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete the restoration of an active task and transit it to the running state. However the restoration can also be stopped when the restoring task gets closed (because it gets migrated to another client, for example). We should also trigger the callback indicating its progress when the restoration stopped in any scenarios. was:Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete the restoration of an active task and transit it to the running state. However the restoration can also be stopped when the restoring task gets closed (because it gets migrated to another client, for example). We should also trigger the callback indicating its progress when the restoration stopped in any scenarios. > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 3.5.0 > > > Part of KIP-869: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility] > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-10575. - Fix Version/s: 3.5.0 Resolution: Fixed > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 3.5.0 > > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-10575: --- Assignee: Guozhang Wang (was: highluck) > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #13744: MINOR: update 3.4.1 licence
divijvaidya commented on PR #13744: URL: https://github.com/apache/kafka/pull/13744#issuecomment-1559041622 Hey @showuon I know that it is unrelated to your specific change but we seem to be missing adding license for a bunch of dependencies such as `bcpkix` and `jaxb`. Shall we do an audit on whether we have listed all dependencies here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on a diff in pull request #13744: MINOR: update 3.4.1 licence
machi1990 commented on code in PR #13744: URL: https://github.com/apache/kafka/pull/13744#discussion_r1202034297 ## LICENSE-binary: ## @@ -205,53 +205,54 @@ This project bundles some components that are also licensed under the Apache License Version 2.0: -audience-annotations-0.5.0 +audience-annotations-0.13.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.13.4 -jackson-core-2.13.4 -jackson-databind-2.13.4.2 -jackson-dataformat-csv-2.13.4 -jackson-dataformat-yaml-2.13.4 -jackson-datatype-jdk8-2.13.4 -jackson-datatype-jsr310-2.13.4 -jackson-jaxrs-base-2.13.4 -jackson-jaxrs-json-provider-2.13.4 -jackson-module-jaxb-annotations-2.13.4 -jackson-module-scala_2.13-2.13.4 -jackson-module-scala_2.12-2.13.4 +jackson-annotations-2.13.5 +jackson-core-2.13.5 +jackson-databind-2.13.5 +jackson-dataformat-csv-2.13.5 +jackson-dataformat-yaml-2.13.5 +jackson-datatype-jdk8-2.13.5 +jackson-datatype-jsr310-2.13.5 +jackson-jaxrs-base-2.13.5 +jackson-jaxrs-json-provider-2.13.5 +jackson-module-jaxb-annotations-2.13.5 +jackson-module-scala_2.13-2.13.5 +jackson-module-scala_2.12-2.13.5 jakarta.validation-api-2.0.2 -javassist-3.27.0-GA -jetty-client-9.4.48.v20220622 -jetty-continuation-9.4.48.v20220622 -jetty-http-9.4.48.v20220622 -jetty-io-9.4.48.v20220622 -jetty-security-9.4.48.v20220622 -jetty-server-9.4.48.v20220622 -jetty-servlet-9.4.48.v20220622 -jetty-servlets-9.4.48.v20220622 -jetty-util-9.4.48.v20220622 -jetty-util-ajax-9.4.48.v20220622 -jersey-common-2.34 -jersey-server-2.34 +javassist-3.29.2-GA +jetty-client-9.4.51.v20230217 +jetty-continuation-9.4.51.v20230217 +jetty-http-9.4.51.v20230217 +jetty-io-9.4.51.v20230217 +jetty-security-9.4.51.v20230217 +jetty-server-9.4.51.v20230217 +jetty-servlet-9.4.51.v20230217 +jetty-servlets-9.4.51.v20230217 +jetty-util-9.4.51.v20230217 +jetty-util-ajax-9.4.51.v20230217 +jersey-common-2.39.1 +jersey-server-2.39.1 jose4j-0.9.3 lz4-java-1.8.0 maven-artifact-3.8.4 metrics-core-4.1.12.1 metrics-core-2.2.0 -netty-buffer-4.1.78.Final -netty-codec-4.1.78.Final -netty-common-4.1.78.Final -netty-handler-4.1.78.Final -netty-resolver-4.1.78.Final -netty-transport-4.1.78.Final -netty-transport-classes-epoll-4.1.78.Final -netty-transport-native-epoll-4.1.78.Final -netty-transport-native-unix-common-4.1.78.Final +netty-buffer-4.1.92.Final +netty-codec-4.1.92.Final +netty-common-4.1.92.Final +netty-handler-4.1.92.Final +netty-resolver-4.1.92.Final +netty-transport-4.1.92.Final +netty-transport-classes-epoll-4.1.92.Final +netty-transport-native-epoll-4.1.92.Final +netty-transport-native-unix-common-4.1.92.Final plexus-utils-3.3.0 reload4j-1.2.19 +reload4j-1.2.25 Review Comment: ah right. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13744: MINOR: update 3.4.1 licence
showuon commented on code in PR #13744: URL: https://github.com/apache/kafka/pull/13744#discussion_r1202033613 ## LICENSE-binary: ## @@ -205,53 +205,54 @@ This project bundles some components that are also licensed under the Apache License Version 2.0: -audience-annotations-0.5.0 +audience-annotations-0.13.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.13.4 -jackson-core-2.13.4 -jackson-databind-2.13.4.2 -jackson-dataformat-csv-2.13.4 -jackson-dataformat-yaml-2.13.4 -jackson-datatype-jdk8-2.13.4 -jackson-datatype-jsr310-2.13.4 -jackson-jaxrs-base-2.13.4 -jackson-jaxrs-json-provider-2.13.4 -jackson-module-jaxb-annotations-2.13.4 -jackson-module-scala_2.13-2.13.4 -jackson-module-scala_2.12-2.13.4 +jackson-annotations-2.13.5 +jackson-core-2.13.5 +jackson-databind-2.13.5 +jackson-dataformat-csv-2.13.5 +jackson-dataformat-yaml-2.13.5 +jackson-datatype-jdk8-2.13.5 +jackson-datatype-jsr310-2.13.5 +jackson-jaxrs-base-2.13.5 +jackson-jaxrs-json-provider-2.13.5 +jackson-module-jaxb-annotations-2.13.5 +jackson-module-scala_2.13-2.13.5 +jackson-module-scala_2.12-2.13.5 jakarta.validation-api-2.0.2 -javassist-3.27.0-GA -jetty-client-9.4.48.v20220622 -jetty-continuation-9.4.48.v20220622 -jetty-http-9.4.48.v20220622 -jetty-io-9.4.48.v20220622 -jetty-security-9.4.48.v20220622 -jetty-server-9.4.48.v20220622 -jetty-servlet-9.4.48.v20220622 -jetty-servlets-9.4.48.v20220622 -jetty-util-9.4.48.v20220622 -jetty-util-ajax-9.4.48.v20220622 -jersey-common-2.34 -jersey-server-2.34 +javassist-3.29.2-GA +jetty-client-9.4.51.v20230217 +jetty-continuation-9.4.51.v20230217 +jetty-http-9.4.51.v20230217 +jetty-io-9.4.51.v20230217 +jetty-security-9.4.51.v20230217 +jetty-server-9.4.51.v20230217 +jetty-servlet-9.4.51.v20230217 +jetty-servlets-9.4.51.v20230217 +jetty-util-9.4.51.v20230217 +jetty-util-ajax-9.4.51.v20230217 +jersey-common-2.39.1 +jersey-server-2.39.1 jose4j-0.9.3 lz4-java-1.8.0 maven-artifact-3.8.4 metrics-core-4.1.12.1 metrics-core-2.2.0 -netty-buffer-4.1.78.Final -netty-codec-4.1.78.Final -netty-common-4.1.78.Final -netty-handler-4.1.78.Final -netty-resolver-4.1.78.Final -netty-transport-4.1.78.Final -netty-transport-classes-epoll-4.1.78.Final -netty-transport-native-epoll-4.1.78.Final -netty-transport-native-unix-common-4.1.78.Final +netty-buffer-4.1.92.Final +netty-codec-4.1.92.Final +netty-common-4.1.92.Final +netty-handler-4.1.92.Final +netty-resolver-4.1.92.Final +netty-transport-4.1.92.Final +netty-transport-classes-epoll-4.1.92.Final +netty-transport-native-epoll-4.1.92.Final +netty-transport-native-unix-common-4.1.92.Final plexus-utils-3.3.0 reload4j-1.2.19 +reload4j-1.2.25 Review Comment: Yes, this is a known issue: https://issues.apache.org/jira/browse/KAFKA-15015 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on a diff in pull request #13744: MINOR: update 3.4.1 licence
machi1990 commented on code in PR #13744: URL: https://github.com/apache/kafka/pull/13744#discussion_r1202030847 ## LICENSE-binary: ## @@ -205,53 +205,54 @@ This project bundles some components that are also licensed under the Apache License Version 2.0: -audience-annotations-0.5.0 +audience-annotations-0.13.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.13.4 -jackson-core-2.13.4 -jackson-databind-2.13.4.2 -jackson-dataformat-csv-2.13.4 -jackson-dataformat-yaml-2.13.4 -jackson-datatype-jdk8-2.13.4 -jackson-datatype-jsr310-2.13.4 -jackson-jaxrs-base-2.13.4 -jackson-jaxrs-json-provider-2.13.4 -jackson-module-jaxb-annotations-2.13.4 -jackson-module-scala_2.13-2.13.4 -jackson-module-scala_2.12-2.13.4 +jackson-annotations-2.13.5 +jackson-core-2.13.5 +jackson-databind-2.13.5 +jackson-dataformat-csv-2.13.5 +jackson-dataformat-yaml-2.13.5 +jackson-datatype-jdk8-2.13.5 +jackson-datatype-jsr310-2.13.5 +jackson-jaxrs-base-2.13.5 +jackson-jaxrs-json-provider-2.13.5 +jackson-module-jaxb-annotations-2.13.5 +jackson-module-scala_2.13-2.13.5 +jackson-module-scala_2.12-2.13.5 jakarta.validation-api-2.0.2 -javassist-3.27.0-GA -jetty-client-9.4.48.v20220622 -jetty-continuation-9.4.48.v20220622 -jetty-http-9.4.48.v20220622 -jetty-io-9.4.48.v20220622 -jetty-security-9.4.48.v20220622 -jetty-server-9.4.48.v20220622 -jetty-servlet-9.4.48.v20220622 -jetty-servlets-9.4.48.v20220622 -jetty-util-9.4.48.v20220622 -jetty-util-ajax-9.4.48.v20220622 -jersey-common-2.34 -jersey-server-2.34 +javassist-3.29.2-GA +jetty-client-9.4.51.v20230217 +jetty-continuation-9.4.51.v20230217 +jetty-http-9.4.51.v20230217 +jetty-io-9.4.51.v20230217 +jetty-security-9.4.51.v20230217 +jetty-server-9.4.51.v20230217 +jetty-servlet-9.4.51.v20230217 +jetty-servlets-9.4.51.v20230217 +jetty-util-9.4.51.v20230217 +jetty-util-ajax-9.4.51.v20230217 +jersey-common-2.39.1 +jersey-server-2.39.1 jose4j-0.9.3 lz4-java-1.8.0 maven-artifact-3.8.4 metrics-core-4.1.12.1 metrics-core-2.2.0 -netty-buffer-4.1.78.Final -netty-codec-4.1.78.Final -netty-common-4.1.78.Final -netty-handler-4.1.78.Final -netty-resolver-4.1.78.Final -netty-transport-4.1.78.Final -netty-transport-classes-epoll-4.1.78.Final -netty-transport-native-epoll-4.1.78.Final -netty-transport-native-unix-common-4.1.78.Final +netty-buffer-4.1.92.Final +netty-codec-4.1.92.Final +netty-common-4.1.92.Final +netty-handler-4.1.92.Final +netty-resolver-4.1.92.Final +netty-transport-4.1.92.Final +netty-transport-classes-epoll-4.1.92.Final +netty-transport-native-epoll-4.1.92.Final +netty-transport-native-unix-common-4.1.92.Final plexus-utils-3.3.0 reload4j-1.2.19 +reload4j-1.2.25 Review Comment: duplicate dependency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
[ https://issues.apache.org/jira/browse/KAFKA-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danica Fine reassigned KAFKA-14539: --- Assignee: Danica Fine > Simplify StreamsMetadataState by replacing the Cluster metadata with > partition info map > --- > > Key: KAFKA-14539 > URL: https://issues.apache.org/jira/browse/KAFKA-14539 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Danica Fine >Priority: Major > > We can clean up the StreamsMetadataState class a bit by removing the > #onChange invocation that currently occurs within > StreamsPartitionAssignor#assign, which then lets us remove the `Cluster` > parameter in that callback. Instead of building a fake Cluster object from > the map of partition info when we invoke #onChange inside the > StreamsPartitionAssignor#onAssignment method, we can just directly pass in > the `Map` and replace the usage of `Cluster` > everywhere in StreamsMetadataState > (I believe the current system is a historical artifact from when we used to > require passing in a {{Cluster}} for the default partitioning strategy, which > the StreamMetadataState needs to compute the partition for a key. At some > point in the past we provided a better way to get the default partition, so > we no longer need a {{Cluster}} parameter/field at all) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13744: MINOR: update 3.4.1 licence
showuon commented on PR #13744: URL: https://github.com/apache/kafka/pull/13744#issuecomment-1559011060 @mimaison , call for review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13744: MINOR: update 3.4.1 licence
showuon opened a new pull request, #13744: URL: https://github.com/apache/kafka/pull/13744 update v3.4.1 licence ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13708: KAFKA-14500; [4/N] Add Timer interface
dajac merged PR #13708: URL: https://github.com/apache/kafka/pull/13708 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison closed pull request #13581: A document on the current usage of Kafka project is added
mimaison closed pull request #13581: A document on the current usage of Kafka project is added URL: https://github.com/apache/kafka/pull/13581 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13581: A document on the current usage of Kafka project is added
mimaison commented on PR #13581: URL: https://github.com/apache/kafka/pull/13581#issuecomment-1558820436 Because it's opened against the 3.5 branch, this PR is currently preventing us to add the 3.5 branch to the Apache Kafka Jenkins job. So I'll close the PR temporarily while I fix the Jenkins job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
dajac merged PR #13704: URL: https://github.com/apache/kafka/pull/13704 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15015) Binaries contain 2 versions of reload4j
[ https://issues.apache.org/jira/browse/KAFKA-15015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725278#comment-17725278 ] Mickael Maison commented on KAFKA-15015: I don't think it's a blocker for 3.4.1 or 3.5.0 so we don't necessarily need the fix immediately. > Binaries contain 2 versions of reload4j > --- > > Key: KAFKA-15015 > URL: https://issues.apache.org/jira/browse/KAFKA-15015 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.4.1 >Reporter: Mickael Maison >Priority: Major > > These releases ship 2 versions of reload4j: > - reload4j-1.2.19.jar > - reload4j-1.2.25.jar -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] akatona84 commented on pull request #13733: KAFKA-13337: fix of possible java.nio.file.AccessDeniedException during Connect plugin directory scan
akatona84 commented on PR #13733: URL: https://github.com/apache/kafka/pull/13733#issuecomment-1558629671 > Is there any way to decide if a specific file/dir is meant to be a plugin? currently the code is only checking whether it is a dir or the extension is zip, jar or class. Yet for an unreadable one, you can't decide anything, even these. Actual plugin scanning is done bit later Issues can be: io exceptions - reading problems url/path problems - bad paths were given (don't see other possible cases) It would be a breaking change to prevent Connect to start in case of errors like these. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
dajac commented on PR #13535: URL: https://github.com/apache/kafka/pull/13535#issuecomment-1558604752 @satishd `testResponseIncludesLeaderEpoch` fails locally. Does it pass for you? It does not seem to be related to slow CI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org