[GitHub] [kafka] vamossagar12 commented on pull request #13733: KAFKA-13337: fix of possible java.nio.file.AccessDeniedException during Connect plugin directory scan

2023-05-23 Thread via GitHub


vamossagar12 commented on PR #13733:
URL: https://github.com/apache/kafka/pull/13733#issuecomment-1560458136

   Thanks @akatona84 for the clarification. If I go by what @gharris1727 is 
saying above, i.e 
   
   > When a directory is not accessible, only plugins within that directory 
should be missing. Plugins from other directories should still be visible if 
possible.
   
   then the changes that you have made in the PR become relevant. The changes 
look fine to me  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-23 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1181520975


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -581,11 +588,18 @@ public void run() {
 if (isLeader()) {
 // Copy log segments to remote storage
 copyLogSegmentsToRemote();
+// Cleanup/delete expired remote log segments
+cleanupExpiredRemoteLogSegments();

Review Comment:
   This is different from local log deletion. It requires the deletion of 
segments from local storage which need to really delete the files. 
   But incase of remote storages, it does not wait for the data to be deleted 
but it marks the file or object for deletion in their respective metadata 
stores. Respective garbage collectors in those storages will take care of 
deleting the data asynchronously. There is no perf impact for these delete 
calls as they take a much shorter time than copying segments. It is very 
unlikely that copying segments get affected because of the deletion of 
segments. Deletion checks are happening in every iteration so there will not be 
many segments that need to be deleted. 
   Anyways, we can discuss this separately in a separate JIRA. On another note, 
all this logic will go to UnifiedLog in future.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14836) Fix UtilsTest#testToLogDateTimeFormat failure in some cases

2023-05-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14836:
--
Fix Version/s: 3.4.1

> Fix UtilsTest#testToLogDateTimeFormat failure in some cases
> ---
>
> Key: KAFKA-14836
> URL: https://issues.apache.org/jira/browse/KAFKA-14836
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tamas Barnabas Egyed
>Assignee: Tamas Barnabas Egyed
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
>  
> {code:java}
> org.apache.kafka.common.utils.UtilsTest.testToLogDateTimeFormat {code}
> test is failing in some cases. It uses hard coded datetime ({*}2020-11-09 
> 12:34:05{*}), while the assertation expected part contains the offset of the 
> current time ({*}Instant.now(){*}). This can lead to problems if there is a 
> summer and winter time for the specific location. We should use the same 
> datetime in the offset query instead of the current time.
> {noformat}
> org.opentest4j.AssertionFailedError: expected: <2020-11-09 12:34:05,123 
> -07:00> but was: <2020-11-09 12:34:05,123 -08:00>{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13744: MINOR: update 3.4.1 licence

2023-05-23 Thread via GitHub


showuon merged PR #13744:
URL: https://github.com/apache/kafka/pull/13744


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13745: KAFKA-15015: Explicit on reload4j version

2023-05-23 Thread via GitHub


showuon commented on PR #13745:
URL: https://github.com/apache/kafka/pull/13745#issuecomment-1560339089

   Backported to 3.4 and 3.5 branch. @mimaison , if you have another RC build, 
this duplicated reload4j binary issue will be fixed. FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15015) Binaries contain 2 versions of reload4j

2023-05-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15015.
---
Fix Version/s: 3.5.0
   3.4.1
   Resolution: Fixed

> Binaries contain 2 versions of reload4j
> ---
>
> Key: KAFKA-15015
> URL: https://issues.apache.org/jira/browse/KAFKA-15015
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.4.1
>Reporter: Mickael Maison
>Assignee: Atul Sharma
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> These releases ship 2 versions of reload4j:
> - reload4j-1.2.19.jar
> - reload4j-1.2.25.jar



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13745: KAFKA-15015: Explicit on reload4j version

2023-05-23 Thread via GitHub


showuon merged PR #13745:
URL: https://github.com/apache/kafka/pull/13745


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13745: KAFKA-15015: Explicit on reload4j version

2023-05-23 Thread via GitHub


showuon commented on PR #13745:
URL: https://github.com/apache/kafka/pull/13745#issuecomment-1560336419

   Failed tests also failed in trunk build
   ```
   Build / JDK 17 and Scala 2.13 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 11 and Scala 2.13 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 11 and Scala 2.13 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testSubscribeWhenTopicUnavailable()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203225900


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203224577


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203222949


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static 

[GitHub] [kafka] junrao commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-23 Thread via GitHub


junrao commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1203191600


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -522,7 +522,12 @@ class BrokerServer(
   }
 
   Some(new RemoteLogManager(remoteLogManagerConfig, config.brokerId, 
config.logDirs.head, time,
-(tp: TopicPartition) => logManager.getLog(tp).asJava));
+(tp: TopicPartition) => logManager.getLog(tp).asJava,
+(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
+  logManager.getLog(tp).foreach(log => {
+log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset)
+  })
+}));

Review Comment:
   unnecessary semicolon. Ditto in KafkaServer.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203192959


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2080 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203184877


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+public enum ConsumerGroupState {
+EMPTY("empty"),
+ASSIGNING("assigning"),
+RECONCILING("reconciling"),
+STABLE("stable"),
+DEAD("dead");
+
+private final String name;
+
+ConsumerGroupState(String name) {
+this.name = name;
+}
+
+@Override
+public String toString() {
+return name;
+}
+}
+
+/**
+ * The snapshot registry.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group state.
+ */
+private final TimelineObject state;
+
+/**
+ * The group epoch. The epoch is incremented whenever the subscriptions
+ * are updated and it will trigger the computation of a new assignment
+ * for the group.
+ */
+private final TimelineInteger groupEpoch;
+
+/**
+ * The group members.
+ */
+private final TimelineHashMap members;
+
+/**
+ * The metadata of the subscribed topics.
+ */
+private final TimelineHashMap 
subscribedTopicMetadata;
+
+/**
+ * The assignment epoch. An assignment epoch smaller than the group epoch 
means
+ * that a new assignment is required. The assignment epoch is updated when 
a new
+ * assignment is installed.
+ */
+private final TimelineInteger assignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private final TimelineHashMap assignments;
+
+/**
+ * The current partition epoch maps each topic-partitions to their current 
epoch where
+ * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+ * itself from this map. When a member gets a partition, it adds itself to 
this map.
+ */
+private final TimelineHashMap> 
currentPartitionEpoch;
+
+public ConsumerGroup(
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {
+this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+this.groupId = Objects.requireNonNull(groupId);
+this.state = new TimelineObject<>(snapshotRegistry, 
ConsumerGroupState.EMPTY);
+this.groupEpoch = new TimelineInteger(snapshotRegistry);
+this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
+this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+}
+
+/**
+ * The type of this group.
+ *
+ * @return The group type (Consumer).
+ */
+@Override
+public GroupType type() {
+return GroupType.CONSUMER;
+}
+
+/**
+ * The state of this group.
+ *
+ * @return The current state as a String.
+ */
+@Override
+public String stateAsString() {
+return 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203181594


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+public enum ConsumerGroupState {
+EMPTY("empty"),
+ASSIGNING("assigning"),
+RECONCILING("reconciling"),
+STABLE("stable"),
+DEAD("dead");
+
+private final String name;
+
+ConsumerGroupState(String name) {
+this.name = name;
+}
+
+@Override
+public String toString() {
+return name;
+}
+}
+
+/**
+ * The snapshot registry.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group state.
+ */
+private final TimelineObject state;
+
+/**
+ * The group epoch. The epoch is incremented whenever the subscriptions
+ * are updated and it will trigger the computation of a new assignment
+ * for the group.
+ */
+private final TimelineInteger groupEpoch;
+
+/**
+ * The group members.
+ */
+private final TimelineHashMap members;
+
+/**
+ * The metadata of the subscribed topics.
+ */
+private final TimelineHashMap 
subscribedTopicMetadata;
+
+/**
+ * The assignment epoch. An assignment epoch smaller than the group epoch 
means
+ * that a new assignment is required. The assignment epoch is updated when 
a new
+ * assignment is installed.
+ */
+private final TimelineInteger assignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private final TimelineHashMap assignments;
+
+/**
+ * The current partition epoch maps each topic-partitions to their current 
epoch where
+ * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+ * itself from this map. When a member gets a partition, it adds itself to 
this map.
+ */
+private final TimelineHashMap> 
currentPartitionEpoch;
+
+public ConsumerGroup(
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {
+this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+this.groupId = Objects.requireNonNull(groupId);
+this.state = new TimelineObject<>(snapshotRegistry, 
ConsumerGroupState.EMPTY);
+this.groupEpoch = new TimelineInteger(snapshotRegistry);
+this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
+this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+}
+
+/**
+ * The type of this group.
+ *
+ * @return The group type (Consumer).
+ */
+@Override
+public GroupType type() {
+return GroupType.CONSUMER;
+}
+
+/**
+ * The state of this group.
+ *
+ * @return The current state as a String.
+ */
+@Override
+public String stateAsString() {
+return 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203158830


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+public enum ConsumerGroupState {
+EMPTY("empty"),
+ASSIGNING("assigning"),
+RECONCILING("reconciling"),
+STABLE("stable"),
+DEAD("dead");
+
+private final String name;
+
+ConsumerGroupState(String name) {
+this.name = name;
+}
+
+@Override
+public String toString() {
+return name;
+}
+}
+
+/**
+ * The snapshot registry.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group state.
+ */
+private final TimelineObject state;
+
+/**
+ * The group epoch. The epoch is incremented whenever the subscriptions
+ * are updated and it will trigger the computation of a new assignment
+ * for the group.
+ */
+private final TimelineInteger groupEpoch;
+
+/**
+ * The group members.
+ */
+private final TimelineHashMap members;
+
+/**
+ * The number of members per server assignor name.
+ */
+private final TimelineHashMap serverAssignors;
+
+/**
+ * The number of subscribers per topic.
+ */
+private final TimelineHashMap subscribedTopicNames;
+
+/**
+ * The metadata of the subscribed topics.
+ */
+private final TimelineHashMap 
subscribedTopicMetadata;
+
+/**
+ * The assignment epoch. An assignment epoch smaller than the group epoch 
means
+ * that a new assignment is required. The assignment epoch is updated when 
a new
+ * assignment is installed.
+ */
+private final TimelineInteger assignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private final TimelineHashMap assignments;
+
+/**
+ * The current partition epoch maps each topic-partitions to their current 
epoch where
+ * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+ * itself from this map. When a member gets a partition, it adds itself to 
this map.
+ */
+private final TimelineHashMap> 
currentPartitionEpoch;
+
+public ConsumerGroup(
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {
+this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+this.groupId = Objects.requireNonNull(groupId);
+this.state = new TimelineObject<>(snapshotRegistry, 
ConsumerGroupState.EMPTY);
+this.groupEpoch = new TimelineInteger(snapshotRegistry);
+this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
+this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+}
+
+/**
+ * The type of 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203152667


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+public enum ConsumerGroupState {
+EMPTY("empty"),
+ASSIGNING("assigning"),
+RECONCILING("reconciling"),
+STABLE("stable"),
+DEAD("dead");
+
+private final String name;
+
+ConsumerGroupState(String name) {
+this.name = name;
+}
+
+@Override
+public String toString() {
+return name;
+}
+}
+
+/**
+ * The snapshot registry.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group state.
+ */
+private final TimelineObject state;
+
+/**
+ * The group epoch. The epoch is incremented whenever the subscriptions
+ * are updated and it will trigger the computation of a new assignment
+ * for the group.
+ */
+private final TimelineInteger groupEpoch;
+
+/**
+ * The group members.
+ */
+private final TimelineHashMap members;
+
+/**
+ * The number of members per server assignor name.
+ */
+private final TimelineHashMap serverAssignors;
+
+/**
+ * The number of subscribers per topic.
+ */
+private final TimelineHashMap subscribedTopicNames;
+
+/**
+ * The metadata of the subscribed topics.
+ */
+private final TimelineHashMap 
subscribedTopicMetadata;
+
+/**
+ * The assignment epoch. An assignment epoch smaller than the group epoch 
means
+ * that a new assignment is required. The assignment epoch is updated when 
a new
+ * assignment is installed.
+ */
+private final TimelineInteger assignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private final TimelineHashMap assignments;
+
+/**
+ * The current partition epoch maps each topic-partitions to their current 
epoch where
+ * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+ * itself from this map. When a member gets a partition, it adds itself to 
this map.
+ */
+private final TimelineHashMap> 
currentPartitionEpoch;
+
+public ConsumerGroup(
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {
+this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+this.groupId = Objects.requireNonNull(groupId);
+this.state = new TimelineObject<>(snapshotRegistry, 
ConsumerGroupState.EMPTY);
+this.groupEpoch = new TimelineInteger(snapshotRegistry);
+this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
+this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+}
+
+/**
+ * The type of 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203152667


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+public enum ConsumerGroupState {
+EMPTY("empty"),
+ASSIGNING("assigning"),
+RECONCILING("reconciling"),
+STABLE("stable"),
+DEAD("dead");
+
+private final String name;
+
+ConsumerGroupState(String name) {
+this.name = name;
+}
+
+@Override
+public String toString() {
+return name;
+}
+}
+
+/**
+ * The snapshot registry.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group state.
+ */
+private final TimelineObject state;
+
+/**
+ * The group epoch. The epoch is incremented whenever the subscriptions
+ * are updated and it will trigger the computation of a new assignment
+ * for the group.
+ */
+private final TimelineInteger groupEpoch;
+
+/**
+ * The group members.
+ */
+private final TimelineHashMap members;
+
+/**
+ * The number of members per server assignor name.
+ */
+private final TimelineHashMap serverAssignors;
+
+/**
+ * The number of subscribers per topic.
+ */
+private final TimelineHashMap subscribedTopicNames;
+
+/**
+ * The metadata of the subscribed topics.
+ */
+private final TimelineHashMap 
subscribedTopicMetadata;
+
+/**
+ * The assignment epoch. An assignment epoch smaller than the group epoch 
means
+ * that a new assignment is required. The assignment epoch is updated when 
a new
+ * assignment is installed.
+ */
+private final TimelineInteger assignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private final TimelineHashMap assignments;
+
+/**
+ * The current partition epoch maps each topic-partitions to their current 
epoch where
+ * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+ * itself from this map. When a member gets a partition, it adds itself to 
this map.
+ */
+private final TimelineHashMap> 
currentPartitionEpoch;
+
+public ConsumerGroup(
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {
+this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+this.groupId = Objects.requireNonNull(groupId);
+this.state = new TimelineObject<>(snapshotRegistry, 
ConsumerGroupState.EMPTY);
+this.groupEpoch = new TimelineInteger(snapshotRegistry);
+this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
+this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+}
+
+/**
+ * The type of 

[GitHub] [kafka] danicafine opened a new pull request, #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-05-23 Thread via GitHub


danicafine opened a new pull request, #13751:
URL: https://github.com/apache/kafka/pull/13751

   Replace usage of Cluster in StreamsMetadataState with Map>. Update StreamsPartitionAssignor#onAssignment method to 
pass existing Map instead of fake Cluster object.
   
   Behavior remains the same; updated existing unit tests accordingly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203150079


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+public enum ConsumerGroupState {
+EMPTY("empty"),
+ASSIGNING("assigning"),
+RECONCILING("reconciling"),
+STABLE("stable"),
+DEAD("dead");
+
+private final String name;
+
+ConsumerGroupState(String name) {
+this.name = name;
+}
+
+@Override
+public String toString() {
+return name;
+}
+}
+
+/**
+ * The snapshot registry.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group state.
+ */
+private final TimelineObject state;
+
+/**
+ * The group epoch. The epoch is incremented whenever the subscriptions
+ * are updated and it will trigger the computation of a new assignment
+ * for the group.
+ */
+private final TimelineInteger groupEpoch;
+
+/**
+ * The group members.
+ */
+private final TimelineHashMap members;
+
+/**
+ * The metadata of the subscribed topics.
+ */
+private final TimelineHashMap 
subscribedTopicMetadata;
+
+/**
+ * The assignment epoch. An assignment epoch smaller than the group epoch 
means
+ * that a new assignment is required. The assignment epoch is updated when 
a new
+ * assignment is installed.
+ */
+private final TimelineInteger assignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private final TimelineHashMap assignments;
+
+/**
+ * The current partition epoch maps each topic-partitions to their current 
epoch where
+ * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+ * itself from this map. When a member gets a partition, it adds itself to 
this map.
+ */
+private final TimelineHashMap> 
currentPartitionEpoch;
+
+public ConsumerGroup(
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {
+this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+this.groupId = Objects.requireNonNull(groupId);
+this.state = new TimelineObject<>(snapshotRegistry, 
ConsumerGroupState.EMPTY);
+this.groupEpoch = new TimelineInteger(snapshotRegistry);
+this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
+this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+}
+
+/**
+ * The type of this group.
+ *
+ * @return The group type (Consumer).
+ */
+@Override
+public GroupType type() {
+return GroupType.CONSUMER;
+}
+
+/**
+ * The state of this group.
+ *
+ * @return The current state as a String.
+ */
+@Override
+public String stateAsString() {
+return 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-23 Thread via GitHub


gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1203138122


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##
@@ -119,36 +120,37 @@ public Connect startConnect(Map 
workerProps, String... extraArgs
 
 log.info("Scanning for plugin classes. This might take a moment ...");
 Plugins plugins = new Plugins(workerProps);
-plugins.compareAndSwapWithDelegatingLoader();
-T config = createConfig(workerProps);
-log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+try (LoaderSwap loaderSwap = 
plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   Also here is some of the context for this change: 
https://github.com/apache/kafka/pull/13165#discussion_r1161929533
   
   Since the elimination of compareAndSwap is technically unrelated to the 
title change, it could be moved out to it's own PR. Let me know if you'd like 
me to separate the two changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203132909


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] gharris1727 commented on pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-23 Thread via GitHub


gharris1727 commented on PR #13165:
URL: https://github.com/apache/kafka/pull/13165#issuecomment-1560240995

   > I'm wondering if we can get better coverage for 
DelegatingClassLoader::scanPluginPath. Right now we verify in 
PluginsTest::newConnectorShouldInstantiateWithPluginClassLoader that if we've 
initialized a Plugins instance, and we invoke Plugins::newConnector, the 
constructor for that connector is called with the correct context classloader. 
But it seems like this isn't very powerful since, if the constructor is invoked 
multiple times, the last invocation's classloader will be recorded--so in this 
case, we're really testing Plugins::newConnector and not the instantiations 
that are performed during plugin discovery.
   
   Yeah this is a blind-spot in the existing tests. The "sampling" paradigm 
requires an instance of the object in order to perform the assertions, and the 
scanPluginPath implementation throws away the objects that it creates. The test 
does not and cannot assert that the TCCL is correct for the first version() 
call, for example.
   
   In this specific case the regression test is still sensitive, because the 
static initialization happens when the plugin constructor is first called (not 
when the Class object is created). This means that we can assert the TCCL 
used in the first constructor via the staticClassloader inspection.
   
   I think the alternative would involve mocking/spying part of the 
scanPluginPath (such as versionFor), or keeping track of instantiated objects 
in SamplingTestPlugins, both of which seem messy, and would make this harder to 
refactor in the near future. Do you think this should be addressed now, or can 
it wait until the plugin path scanning refactor is landed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203119594


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203119594


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203119594


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203106357


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203106357


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203084630


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203083017


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203079411


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1203077539


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] clayburn commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-05-23 Thread via GitHub


clayburn commented on PR #13676:
URL: https://github.com/apache/kafka/pull/13676#issuecomment-1560105054

   I updated this PR to the latest version of Gradle Enterprise Gradle Plugin. 
Is there anything else I need to do to get this PR integrated?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-23 Thread via GitHub


gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1202962972


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
 private final ClassLoader savedLoader;
 
-public LoaderSwap(ClassLoader savedLoader) {
+public static LoaderSwap use(ClassLoader loader) {
+ClassLoader savedLoader = compareAndSwapLoaders(loader);
+try {
+return new LoaderSwap(savedLoader);
+} catch (Throwable t) {
+compareAndSwapLoaders(savedLoader);
+throw t;
+}
+}

Review Comment:
   This is not re-introducing the static logic, it is just refactoring to 
eliminate the open-ended Plugins.compareAndSwap* methods.
   
   This method is only called in two places: by 
DelegatingClassLoader.scanPluginPath (before scanning is finished) and 
Plugins.withClassLoader (after scanning is finished).
   
   I've dropped the visibility and made the DCL call-site mock-able.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -89,9 +88,6 @@ public class EmbeddedConnectCluster {
 private final String workerNamePrefix;
 private final AtomicInteger nextWorkerId = new AtomicInteger(0);
 private final EmbeddedConnectClusterAssertions assertions;
-// we should keep the original class loader and set it back after 
connector stopped since the connector will change the class loader,
-// and then, the Mockito will use the unexpected class loader to generate 
the wrong proxy instance, which makes mock failed
-private final ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();

Review Comment:
   I disagree. I think that this is a symptom of the open-ended context 
classloader swap having unintended downstream effects. The existing fix is 
adequate, but is mostly addressing the symptom rather than the problem.



##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##
@@ -119,36 +120,37 @@ public Connect startConnect(Map 
workerProps, String... extraArgs
 
 log.info("Scanning for plugin classes. This might take a moment ...");
 Plugins plugins = new Plugins(workerProps);
-plugins.compareAndSwapWithDelegatingLoader();
-T config = createConfig(workerProps);
-log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+try (LoaderSwap loaderSwap = 
plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   I understand that this is a change in semantics, but that change is 
intentional. After this method completes, operations should not require the 
delegating loader and should be performed via the Connect handle. That handle 
only has methods for starting, stopping, and interacting with the REST API, all 
of which should internally handle setting the context classloader when 
appropriate.
   
   The reason that I'm changing this is that I think the open-ended swap 
methods are an anti-pattern, and lead to unexpected behavior later in the 
caller thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1202903632


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##
@@ -119,36 +120,37 @@ public Connect startConnect(Map 
workerProps, String... extraArgs
 
 log.info("Scanning for plugin classes. This might take a moment ...");
 Plugins plugins = new Plugins(workerProps);
-plugins.compareAndSwapWithDelegatingLoader();
-T config = createConfig(workerProps);
-log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+try (LoaderSwap loaderSwap = 
plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   This is actually incorrect; we want the delegating loader to remain the 
classloader even after this method exits (normally or exceptionally).



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
 private final ClassLoader savedLoader;
 
-public LoaderSwap(ClassLoader savedLoader) {
+public static LoaderSwap use(ClassLoader loader) {
+ClassLoader savedLoader = compareAndSwapLoaders(loader);
+try {
+return new LoaderSwap(savedLoader);
+} catch (Throwable t) {
+compareAndSwapLoaders(savedLoader);
+throw t;
+}
+}

Review Comment:
   Adding static logic that invokes `compareAndSwapLoaders` is difficult to 
test, which was the motivation for KAFKA-14346. Can we try not to re-introduce 
that kind of static logic?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -360,17 +360,19 @@ private PluginScanResult scanPluginPath(
 builder.useParallelExecutor();
 Reflections reflections = new InternalReflections(builder);
 
-return new PluginScanResult(
-getPluginDesc(reflections, SinkConnector.class, loader),
-getPluginDesc(reflections, SourceConnector.class, loader),
-getPluginDesc(reflections, Converter.class, loader),
-getPluginDesc(reflections, HeaderConverter.class, loader),
-getTransformationPluginDesc(loader, reflections),
-getPredicatePluginDesc(loader, reflections),
-getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
-);
+try (LoaderSwap loaderSwap = LoaderSwap.use(loader)) {

Review Comment:
   If static initialization logic for a plugin class changes the context 
classloader, then that will remain the classloader for the rest of the plugin 
scanning that takes place in this method.
   
   I don't think we have to accommodate this case, but if there's an easy way 
to, we might try.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -89,9 +88,6 @@ public class EmbeddedConnectCluster {
 private final String workerNamePrefix;
 private final AtomicInteger nextWorkerId = new AtomicInteger(0);
 private final EmbeddedConnectClusterAssertions assertions;
-// we should keep the original class loader and set it back after 
connector stopped since the connector will change the class loader,
-// and then, the Mockito will use the unexpected class loader to generate 
the wrong proxy instance, which makes mock failed
-private final ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();

Review Comment:
   I think we need to keep this since the change to 
`AbstractConnectCli::startConnect` is incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13749: KAFKA-15016: Update LICENSE-binary file

2023-05-23 Thread via GitHub


divijvaidya commented on PR #13749:
URL: https://github.com/apache/kafka/pull/13749#issuecomment-1559996120

   `bcpkix` is still missing 
https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L61. Do 
you happen to know why?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1202846585


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1202846284


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1202840230


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-23 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1202837929


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13733: KAFKA-13337: fix of possible java.nio.file.AccessDeniedException during Connect plugin directory scan

2023-05-23 Thread via GitHub


gharris1727 commented on code in PR #13733:
URL: https://github.com/apache/kafka/pull/13733#discussion_r1202806994


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java:
##
@@ -22,6 +22,7 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;

Review Comment:
   There's one more leaked classloader, in 
testLoadingMixOfValidAndInvalidPlugins if you want to fix that as well.
   I don't see these warnings as relevant, since this is test code. But I won't 
stop you from adding the try-with-resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #9910: KAFKA-10877

2023-05-23 Thread via GitHub


gharris1727 commented on PR #9910:
URL: https://github.com/apache/kafka/pull/9910#issuecomment-1559871005

   @smccauliff Are you still interested in making this change?
   I think this is still affecting users and wasting CPU cycles :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202738629


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+new SinkConnectorConfig(plugins, connectorConfig),
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Map offsetsToAlter = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() != null)
+.map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue(
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+Set partitionsToReset = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.toSet());
+
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {
+ 

[jira] [Commented] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2023-05-23 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725502#comment-17725502
 ] 

Chris Egerton commented on KAFKA-15018:
---

One possible fix for this could be to preemptively write tombstone offsets to 
the global offsets topic before writing any offsets to the per-connector 
offsets topic, and preserve the existing write logic for all non-tombstone 
offsets.

Tombstone offsets should be fairly rare and so in the common case, this will 
have no impact on connector performance or availability. However, when this 
case is hit, the proposed fix would require two synchronous writes to topics 
that are potentially hosted on different clusters. This is not ideal, but it's 
unclear whether a better alternative exists.

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Priority: Major
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2023-05-23 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15018:
-

 Summary: Potential tombstone offsets corruption for exactly-once 
source connectors
 Key: KAFKA-15018
 URL: https://issues.apache.org/jira/browse/KAFKA-15018
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.3.2, 3.3.1, 3.4.0, 3.3.0, 3.5.0, 3.4.1
Reporter: Chris Egerton


When exactly-once support is enabled for source connectors, source offsets can 
potentially be written to two different offsets topics: a topic specific to the 
connector, and the global offsets topic (which was used for all connectors 
prior to KIP-618 / version 3.3.0).

Precedence is given to offsets in the per-connector offsets topic, but if none 
are found for a given partition, then the global offsets topic is used as a 
fallback.

When committing offsets, a transaction is used to ensure that source records 
and source offsets are written to the Kafka cluster targeted by the source 
connector. This transaction only includes the connector-specific offsets topic. 
Writes to the global offsets topic take place after writes to the 
connector-specific offsets topic have completed successfully, and if they fail, 
a warning message is logged, but no other action is taken.

Normally, this ensures that, for offsets committed by exactly-once-supported 
source connectors, the per-connector offsets topic is at least as up-to-date as 
the global offsets topic, and sometimes even ahead.

However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
is successfully written to the per-connector offsets topic, but cannot be 
written to the global offsets topic, then the global offsets topic will still 
contain that source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker, if a task requests offsets for one 
of the tombstoned partitions, the worker will provide it with the offsets 
present in the global offsets topic, instead of indicating to the task that no 
offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe closed pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-23 Thread via GitHub


cmccabe closed pull request #13686: MINOR: Create the MetadataNode classes to 
introspect MetadataImage
URL: https://github.com/apache/kafka/pull/13686


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202709620


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+new SinkConnectorConfig(plugins, connectorConfig),
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Map offsetsToAlter = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() != null)
+.map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue(
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+Set partitionsToReset = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.toSet());
+
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {
+ 

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202694716


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1235,6 +1244,246 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets a mapping from partitions (either source partitions for 
source connectors, or Kafka topic
+ *partitions for sink connectors) to offsets that need to 
be written; may not be null or empty
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+
+if (offsets == null || offsets.isEmpty()) {
+throw new ConnectException("The offsets to be altered may not be 
null or empty");
+}
+
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be written; may not be null or empty
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+sinkConnectorConfig,
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);

Review Comment:
   Can we construct a `List>` here and add elements to that 
as necessary (i.e., depending on whether the sets of to-be-removed and 
to-be-altered partitions are empty), then combine then with `KafkaFuture 
adminFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));`?
   
   The use of a no-op future is a little hard to read.



##

[GitHub] [kafka] yashmayya opened a new pull request, #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

2023-05-23 Thread via GitHub


yashmayya opened a new pull request, #13750:
URL: https://github.com/apache/kafka/pull/13750

   - Handle the config topic read timeout edge case in 
`DistributedHerder::stopConnector`
   - https://github.com/apache/kafka/pull/13465#discussion_r1200500990
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202684769


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java:
##
@@ -52,4 +53,82 @@ public static ConnectorOffsets 
consumerGroupOffsetsToConnectorOffsets(Map
+ * {
+ *   "kafka_topic": "topic"
+ *   "kafka_partition": 3
+ * }
+ * 
+ *
+ * and that the provided offsets (values in the {@code partitionOffsets} 
map) look like:
+ * 
+ * {
+ *   "kafka_offset": 1000
+ * }
+ * 
+ *
+ * This method then parses them into a mapping from {@link 
TopicPartition}s to their corresponding {@link Long}
+ * valued offsets.
+ *
+ * @param partitionOffsets the partitions to offset map that needs to be 
validated and parsed.
+ * @return the parsed mapping from {@link TopicPartition} to its 
corresponding {@link Long} valued offset.
+ *
+ * @throws BadRequestException if the provided offsets aren't in the 
expected format
+ */
+public static Map 
validateAndParseSinkConnectorOffsets(Map, Map> 
partitionOffsets) {

Review Comment:
   Well done, looks great 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202678391


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);
+if (!alterConnectorOffsetsChecks(connName, callback)) {
+return null;
+}
+// At this point, we should be the leader (the call to 
alterConnectorOffsetsChecks makes sure of that) and can safely run
+// a zombie fencing request
+if (isSourceConnector(connName) && 
config.exactlyOnceSourceEnabled()) {
+log.debug("Performing a round of zombie fencing before 
altering offsets for source connector {} with exactly-once semantics enabled.", 
connName);
+getFenceZombieSourceTasksCallable(connName, (error, ignored) 
-> {
+if (error != null) {
+log.error("Failed to perform zombie fencing for 
exactly-once source connector prior to altering offsets", error);
+callback.onCompletion(new ConnectException("Failed to 
perform zombie fencing for exactly-once source connector prior to altering 
offsets",
+error), null);
+} else {
+log.debug("Successfully completed zombie fencing for 
source connector {}; proceeding to alter offsets.", connName);
+// We need to ensure that we perform the necessary 
checks again inside alterConnectorOffsetsHerderRequest
+// since it is being run in a separate herder request 
and the conditions could have changed since the
+// previous check
+addRequest(getAlterConnectorOffsetsCallable(connName, 
offsets, callback), forwardErrorCallback(callback));
+}
+}).call();
+} else {
+getAlterConnectorOffsetsCallable(connName, offsets, 
callback).call();

Review Comment:
   One convention I've seen used is `foo` (public) and `doFoo` (internal). I'm 
not a huge fan of it but in some cases it can be useful.
   
   Also, are we issuing unnecessary calls to `alterConnectorOffsetsChecks`? We 
do a check in the beginning of `alterConnectorOffsets` and then, if the 
connector is a sink (or exactly-once source support is disabled), we 
immediately invoke the `Callable` returned by 
`getAlterConnectorOffsetsCallable`, which calls `alterConnectorOffsetsChecks` a 
second time.
   
   We might just get rid of `getAlterConnectorOffsetsCallable` altogether and 
inline the logic for it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202644196


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);

Review Comment:
   Good point--yes, we should handle that in `stopConnector` as well (although 
we don't have to do that in this PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] KarboniteKream commented on pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-05-23 Thread via GitHub


KarboniteKream commented on PR #13679:
URL: https://github.com/apache/kafka/pull/13679#issuecomment-1559681643

   This PR seems to have introduced a regression (confirmed using bisect). In a 
simple setup of two controllers using `config/kraft/controller.properties`, 
after the leader is shut down and restarted, `UNKNOWN_SERVER_EXCEPTION` will be 
thrown by `ApiVersionsRequest`.
   
   The other controller sees the following exception:
   ```
   [2023-05-19 15:50:18,834] WARN [QuorumController id=0] getFinalizedFeatures: 
failed with unknown server exception RuntimeException in 28 us.  The controller 
is already in standby mode. (org.apache.kafka.controller.QuorumController)
   java.lang.RuntimeException: No in-memory snapshot for epoch 84310. Snapshot 
epochs are: 61900
   at 
org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:173)
   at 
org.apache.kafka.timeline.SnapshotRegistry.iterator(SnapshotRegistry.java:131)
   at org.apache.kafka.timeline.TimelineObject.get(TimelineObject.java:69)
   at 
org.apache.kafka.controller.FeatureControlManager.finalizedFeatures(FeatureControlManager.java:303)
   at 
org.apache.kafka.controller.QuorumController.lambda$finalizedFeatures$16(QuorumController.java:2016)
   at 
org.apache.kafka.controller.QuorumController$ControllerReadEvent.run(QuorumController.java:546)
   at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
   at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
   at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
   at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   A similar issue was reported in 
[KAFKA-14996](https://issues.apache.org/jira/browse/KAFKA-14996). I'll report 
this in Jira once my account is approved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #13749: KAFKA-15016: Update LICENSE-binary file

2023-05-23 Thread via GitHub


mimaison opened a new pull request, #13749:
URL: https://github.com/apache/kafka/pull/13749

   The file was getting a bit out of sync with the actual dependencies we ship. 
Also the [process we follow each 
release](https://issues.apache.org/jira/browse/KAFKA-12622) catches missing 
licenses but does not clear dependencies not used anymore, so this removes a 
few unnecessary entries too.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13741: KAFKA-15009: Handle errors while migrating ACL metadata from snapshot to Zk during DUAL_WRITE mode.

2023-05-23 Thread via GitHub


mumrah merged PR #13741:
URL: https://github.com/apache/kafka/pull/13741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15009) New ACLs are not written to ZK during migration

2023-05-23 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15009:
-
Component/s: kraft

> New ACLs are not written to ZK during migration
> ---
>
> Key: KAFKA-15009
> URL: https://issues.apache.org/jira/browse/KAFKA-15009
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Blocker
>  Labels: kraft, migration
>
> While handling snapshots in dual-write mode, we are missing the logic to 
> detect new ACLs created in KRaft. This means we will not write these new ACLs 
> back to ZK and they would be missing if a user rolled back their cluster to 
> ZK mode. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on pull request #13741: KAFKA-15009: Handle errors while migrating ACL metadata from snapshot to Zk during DUAL_WRITE mode.

2023-05-23 Thread via GitHub


mumrah commented on PR #13741:
URL: https://github.com/apache/kafka/pull/13741#issuecomment-1559562400

   Ok, I split out the client quotas work into KAFKA-15017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15017) New ClientQuotas are not written to ZK from snapshot

2023-05-23 Thread David Arthur (Jira)
David Arthur created KAFKA-15017:


 Summary: New ClientQuotas are not written to ZK from snapshot 
 Key: KAFKA-15017
 URL: https://issues.apache.org/jira/browse/KAFKA-15017
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.5.0
Reporter: David Arthur


Similar issue to KAFKA-15009



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15009) New ACLs are not written to ZK during migration

2023-05-23 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15009:
-
Description: While handling snapshots in dual-write mode, we are missing 
the logic to detect new ACLs created in KRaft. This means we will not write 
these new ACLs back to ZK and they would be missing if a user rolled back their 
cluster to ZK mode. 

> New ACLs are not written to ZK during migration
> ---
>
> Key: KAFKA-15009
> URL: https://issues.apache.org/jira/browse/KAFKA-15009
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Blocker
>  Labels: kraft, migration
>
> While handling snapshots in dual-write mode, we are missing the logic to 
> detect new ACLs created in KRaft. This means we will not write these new ACLs 
> back to ZK and they would be missing if a user rolled back their cluster to 
> ZK mode. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15009) New ACLs are not written to ZK during migration

2023-05-23 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-15009:


Assignee: Akhilesh Chaganti  (was: David Arthur)

> New ACLs are not written to ZK during migration
> ---
>
> Key: KAFKA-15009
> URL: https://issues.apache.org/jira/browse/KAFKA-15009
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Blocker
>  Labels: kraft, migration
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15009) New ACLs are not written to ZK during migration

2023-05-23 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-15009:


Assignee: David Arthur  (was: Akhilesh Chaganti)

> New ACLs are not written to ZK during migration
> ---
>
> Key: KAFKA-15009
> URL: https://issues.apache.org/jira/browse/KAFKA-15009
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: David Arthur
>Priority: Blocker
>  Labels: kraft, migration
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15009) New ACLs are not written to ZK during migration

2023-05-23 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15009:
-
Summary: New ACLs are not written to ZK during migration  (was: 
ClientQuotas and ACLs are not correctly synchronized while handling snapshot 
during migration (dual write))

> New ACLs are not written to ZK during migration
> ---
>
> Key: KAFKA-15009
> URL: https://issues.apache.org/jira/browse/KAFKA-15009
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Blocker
>  Labels: kraft, migration
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] krespo opened a new pull request, #13748: [BUGFIX] Bugfixed in KAFKA-8713, but it doesn't work properly.

2023-05-23 Thread via GitHub


krespo opened a new pull request, #13748:
URL: https://github.com/apache/kafka/pull/13748

   
   Through the ticket 
["KAFKA-8713"](https://issues.apache.org/jira/browse/KAFKA-8713), a bug that 
always outputs "default value" when JsonConverter is a nullable schema was 
fixed.
   
   After applying the bug fix, I set the following settings in kafka connect.
   
   `
   "value.converter.replace.null.with.default": "false",
   `
   However, I found that the same problem still occurred, so I modified the 
code to fix this bug.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13747: MINOR: Fix ListOffsetsRequestTest.testResponseIncludesLeaderEpoch

2023-05-23 Thread via GitHub


dajac commented on code in PR #13747:
URL: https://github.com/apache/kafka/pull/13747#discussion_r1202331207


##
core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala:
##
@@ -192,7 +192,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 killBroker(firstLeaderId)
 val secondLeaderId = TestUtils.awaitLeaderChange(servers, partition, 
firstLeaderId)
 // make sure high watermark of new leader has caught up
-TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, 0L, 
-1).errorCode() != Errors.OFFSET_NOT_AVAILABLE.code(),
+TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, 
ListOffsetsRequest.LATEST_TIMESTAMP, -1).errorCode != 
Errors.OFFSET_NOT_AVAILABLE.code,

Review Comment:
   `OFFSET_NOT_AVAILABLE` is only returned when `LATEST_TIMESTAMP` or 
`MAX_TIMESTAMP` are used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13747: MINOR: Fix ListOffsetsRequestTest.testResponseIncludesLeaderEpoch

2023-05-23 Thread via GitHub


dajac opened a new pull request, #13747:
URL: https://github.com/apache/kafka/pull/13747

   Fix flaky ListOffsetsRequestTest.testResponseIncludesLeaderEpoch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-23 Thread via GitHub


dajac commented on PR #13535:
URL: https://github.com/apache/kafka/pull/13535#issuecomment-1559262765

   @satishd Weird... It fails all the time on my laptop.
   
   ```
   Gradle Test Run :core:test > Gradle Test Executor 9 > ListOffsetsRequestTest 
> testResponseIncludesLeaderEpoch() FAILED
   org.opentest4j.AssertionFailedError: expected: <(10,1,0)> but was: 
<(-1,-1,78)>
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
   at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
   at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
   at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1142)
   at 
app//kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:210)
   ```
   
   ```
   % git rev-parse --verify HEAD
   15f8705246e094f7825b76a38d9f12f95d626ee5
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sanjanagoyal opened a new pull request, #13746: removed recreation of partition to fix partitions having uneven data disctribution

2023-05-23 Thread via GitHub


sanjanagoyal opened a new pull request, #13746:
URL: https://github.com/apache/kafka/pull/13746

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] atu-sharm commented on pull request #13745: KAFKA-15015: Explicit on reload4j version

2023-05-23 Thread via GitHub


atu-sharm commented on PR #13745:
URL: https://github.com/apache/kafka/pull/13745#issuecomment-1559239546

   Output:
   ```
   reload4j-1.2.25.jar
   slf4j-reload4j-1.7.36.jar
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13745: KAFKA-15015: Explicit on reload4j version

2023-05-23 Thread via GitHub


showuon commented on PR #13745:
URL: https://github.com/apache/kafka/pull/13745#issuecomment-1559220469

   @atu-sharm , could you run these commands to show what's the output of the 
lib folder for reload library?
   ```
   // build the binary artifacts
   $ ./gradlewAll releaseTarGz
   
   // unpack the binary artifact 
   $ tar xf core/build/distributions/kafka_2.13-X.Y.Z.tgz
   $ cd xf kafka_2.13-X.Y.Z
   
   // list the packaged jars 
   // (you can ignore the jars for our own modules, like kafka, kafka-clients, 
etc.)
   $ ls libs | grep reload
   ```
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-23 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1184873142


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -670,6 +875,14 @@ public void close() {
 } catch (InterruptedException e) {
 // ignore
 }
+remoteStorageReaderThreadPool.shutdownNow();
+//waits for 2 mins to terminate the current tasks
+try {
+remoteStorageReaderThreadPool.awaitTermination(2, 
TimeUnit.MINUTES);

Review Comment:
   It does not require that to be completed in 5 mins. 
`lifecycleManager.controlledShutdownFuture` is more about processing the 
controlled shutdown event to the controller for that broker. It will wait for 5 
mins before proceeding with other sequence of actions. But that will not get 
affected because of the code introduced here. 
   Logging subsystem handles unclean shutdown for log segments and it would 
have been already finished before RemoteLogManager is closed. So, they will not 
get affected because of this timeout. But we can have a short duration here 
like 10 secs, we can revisit introducing a config if it is really needed for 
closing the remote log subsystem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-23 Thread via GitHub


satishd commented on PR #13535:
URL: https://github.com/apache/kafka/pull/13535#issuecomment-1559216261

   @dajac It is passed locally on my laptop.
   
   ```
   Gradle Test Run :core:test > Gradle Test Executor 65 > 
ListOffsetsRequestTest > 
testResponseDefaultOffsetAndLeaderEpochForAllVersions() PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 65 > 
ListOffsetsRequestTest > testListOffsetsMaxTimeStampOldestVersion() PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 65 > 
ListOffsetsRequestTest > testListOffsetsErrorCodes() PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 65 > 
ListOffsetsRequestTest > testCurrentEpochValidation() PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 65 > 
ListOffsetsRequestTest > testResponseIncludesLeaderEpoch() PASSED
   
   BUILD SUCCESSFUL in 1m 8s
   55 actionable tasks: 6 executed, 49 up-to-date
   ➜  kafka git:(apache-trunk) date
   Tue May 23 18:00:01 IST 2023
   ➜  kafka git:(apache-trunk) git rev-parse --verify HEAD
   15f8705246e094f7825b76a38d9f12f95d626ee5
   ➜  kafka git:(apache-trunk)
   ```
   
   ```
   > Task :core:test
   
   Gradle Test Run :core:test > Gradle Test Executor 71 > 
ListOffsetsRequestWithRemoteStoreTest > 
testResponseDefaultOffsetAndLeaderEpochForAllVersions() PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 71 > 
ListOffsetsRequestWithRemoteStoreTest > 
testListOffsetsMaxTimeStampOldestVersion() PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 71 > 
ListOffsetsRequestWithRemoteStoreTest > testListOffsetsErrorCodes() PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 71 > 
ListOffsetsRequestWithRemoteStoreTest > testCurrentEpochValidation() PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 71 > 
ListOffsetsRequestWithRemoteStoreTest > testResponseIncludesLeaderEpoch() PASSED
   
   BUILD SUCCESSFUL in 1m 9s
   55 actionable tasks: 6 executed, 49 up-to-date
   ➜  kafka git:(apache-trunk) date
   Tue May 23 18:05:20 IST 2023
   ➜  kafka git:(apache-trunk) git rev-parse --verify HEAD
   15f8705246e094f7825b76a38d9f12f95d626ee5
   ➜  kafka git:(apache-trunk)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15015) Binaries contain 2 versions of reload4j

2023-05-23 Thread Atul Sharma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Atul Sharma reassigned KAFKA-15015:
---

Assignee: Atul Sharma

> Binaries contain 2 versions of reload4j
> ---
>
> Key: KAFKA-15015
> URL: https://issues.apache.org/jira/browse/KAFKA-15015
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.4.1
>Reporter: Mickael Maison
>Assignee: Atul Sharma
>Priority: Major
>
> These releases ship 2 versions of reload4j:
> - reload4j-1.2.19.jar
> - reload4j-1.2.25.jar



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] atu-sharm commented on pull request #13745: KAFKA-15015: Explicit on reload4j version

2023-05-23 Thread via GitHub


atu-sharm commented on PR #13745:
URL: https://github.com/apache/kafka/pull/13745#issuecomment-1559203084

   @showuon @mimaison can you review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15015) Binaries contain 2 versions of reload4j

2023-05-23 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725390#comment-17725390
 ] 

Atul Sharma commented on KAFKA-15015:
-

[~mimaison] [~showuon] raised PR [GitHub Pull Request 
#13745|https://github.com/apache/kafka/pull/13745]

> Binaries contain 2 versions of reload4j
> ---
>
> Key: KAFKA-15015
> URL: https://issues.apache.org/jira/browse/KAFKA-15015
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.4.1
>Reporter: Mickael Maison
>Priority: Major
>
> These releases ship 2 versions of reload4j:
> - reload4j-1.2.19.jar
> - reload4j-1.2.25.jar



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] atu-sharm opened a new pull request, #13745: KAFKA-15015: Explicit on reload4j version

2023-05-23 Thread via GitHub


atu-sharm opened a new pull request, #13745:
URL: https://github.com/apache/kafka/pull/13745

   Defining a specific version for reload4j
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1200530069


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);
+if (!alterConnectorOffsetsChecks(connName, callback)) {
+return null;
+}
+// At this point, we should be the leader (the call to 
alterConnectorOffsetsChecks makes sure of that) and can safely run
+// a zombie fencing request
+if (isSourceConnector(connName) && 
config.exactlyOnceSourceEnabled()) {
+log.debug("Performing a round of zombie fencing before 
altering offsets for source connector {} with exactly-once semantics enabled.", 
connName);
+getFenceZombieSourceTasksCallable(connName, (error, ignored) 
-> {
+if (error != null) {
+log.error("Failed to perform zombie fencing for 
exactly-once source connector prior to altering offsets", error);
+callback.onCompletion(new ConnectException("Failed to 
perform zombie fencing for exactly-once source connector prior to altering 
offsets",
+error), null);
+} else {
+log.debug("Successfully completed zombie fencing for 
source connector {}; proceeding to alter offsets.", connName);
+// We need to ensure that we perform the necessary 
checks again inside alterConnectorOffsetsHerderRequest
+// since it is being run in a separate herder request 
and the conditions could have changed since the
+// previous check
+addRequest(getAlterConnectorOffsetsCallable(connName, 
offsets, callback), forwardErrorCallback(callback));
+}
+}).call();
+} else {
+getAlterConnectorOffsetsCallable(connName, offsets, 
callback).call();

Review Comment:
   I think one of the main motivations here (and above) was naming  
   
   We already have `fenceZombieSourceTasks` and `alterConnectorOffsets` 
interface methods but there was a need to break out some of the logic from both 
into separate methods for re-use. I'm happy to take any naming suggestions to 
make them synchronous and wrap them into callables at the relevant call sites.
   
   Maybe something like `fenceZombieSourceTasksSync` / 
`alterConnectorOffsetsSync` or `fenceZombieSourceTasksInternal` / 
`alterConnectorOffsetsInternal`? I don't particularly like either though, 
naming is hard...



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);

Review Comment:
   Thanks, both points make sense, done. Btw shouldn't we also handle the read 
timeout case in the 
[stopConnector](https://github.com/apache/kafka/blob/e96a463561ca8974fca37562b8675ae8ae4aff29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1107)
 method? The `KafkaConfigBackingStore::putTaskConfigs` method does include a 
read to the end of the config topic at the end but the subsequently called 
`KafkaConfigBackingStore::putTargetState` method doesn't.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 

[GitHub] [kafka] divijvaidya commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0

2023-05-23 Thread via GitHub


divijvaidya commented on PR #13662:
URL: https://github.com/apache/kafka/pull/13662#issuecomment-1559122983

   I found today that there is another change that needs to make it in this PR. 
We need to update the LICENSE file with the correct version at 
https://github.com/apache/kafka/blob/trunk/LICENSE-binary#L211 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2023-05-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725357#comment-17725357
 ] 

Matthias J. Sax commented on KAFKA-7497:


Seems to be fixed. Cf https://issues.apache.org/jira/browse/KAFKA-14209 

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7497) Kafka Streams should support self-join on streams

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7497.

Resolution: Fixed

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13744: MINOR: update 3.4.1 licence

2023-05-23 Thread via GitHub


mimaison commented on PR #13744:
URL: https://github.com/apache/kafka/pull/13744#issuecomment-1559077621

   @divijvaidya For the "audit" I filled 
https://issues.apache.org/jira/browse/KAFKA-15016 yesterday.
   The validation we use to check what the LICENSE file should include is 
documented in https://issues.apache.org/jira/browse/KAFKA-12622


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14173) TopologyTestDriver does not use mock wall clock time when sending test records

2023-05-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725356#comment-17725356
 ] 

Matthias J. Sax commented on KAFKA-14173:
-

Just discovering this ticket.

I guess you would need to use `TestInputTopic#advanceTime` for this case?

Closing the ticket as "no an issue", as the API is there. Feel free to follow 
up.

> TopologyTestDriver does not use mock wall clock time when sending test records
> --
>
> Key: KAFKA-14173
> URL: https://issues.apache.org/jira/browse/KAFKA-14173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.1
>Reporter: Guido Josquin
>Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal 
> is to confirm that my topology performs the following left join correctly.
> {code:java}
> bills
>   .leftJoin(payments)(
> {
>   case (billValue, null) => billValue
>   case (billValue, paymentValue) => (billValue.toInt - 
> paymentValue.toInt).toString
> },
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
>   )
>   .to("debt")
> {code}
>  
> In other words, if we see a `bill` and a `payment` within 100ms, the payment 
> should be subtracted from the bill. If we do not see a payment, the debt is 
> simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer, 
> serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer, 
> serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer, 
> serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> // Send a new record to cause the previous window to be closed
> payments.pipeInput("percy", "0")
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
>   // This record is present
>   new KeyValue[String, String]("fred", "5"),
>   // This record is missing
>   new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
> Is seems that advancing the wall clock time, sleeping, or sending an extra 
> record, never triggers the join condition when data only arrives on the left 
> side. It is possible to circumvent this by passing an explicit event time 
> with each test record. (See 
> https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161)
>  
> However, the behavior deviates from a real Kafka broker. With a real broker, 
> if we do not send an event, it uses the wall clock time of the broker 
> instead. The behavior under test should be the same: 
> `driver.advanceWallClockTime` should provide the default time to be used for 
> `TestTopic.pipeInput`, when no other time is specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14173) TopologyTestDriver does not use mock wall clock time when sending test records

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14173.
-
Resolution: Not A Problem

> TopologyTestDriver does not use mock wall clock time when sending test records
> --
>
> Key: KAFKA-14173
> URL: https://issues.apache.org/jira/browse/KAFKA-14173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.1
>Reporter: Guido Josquin
>Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal 
> is to confirm that my topology performs the following left join correctly.
> {code:java}
> bills
>   .leftJoin(payments)(
> {
>   case (billValue, null) => billValue
>   case (billValue, paymentValue) => (billValue.toInt - 
> paymentValue.toInt).toString
> },
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
>   )
>   .to("debt")
> {code}
>  
> In other words, if we see a `bill` and a `payment` within 100ms, the payment 
> should be subtracted from the bill. If we do not see a payment, the debt is 
> simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer, 
> serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer, 
> serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer, 
> serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> // Send a new record to cause the previous window to be closed
> payments.pipeInput("percy", "0")
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
>   // This record is present
>   new KeyValue[String, String]("fred", "5"),
>   // This record is missing
>   new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
> Is seems that advancing the wall clock time, sleeping, or sending an extra 
> record, never triggers the join condition when data only arrives on the left 
> side. It is possible to circumvent this by passing an explicit event time 
> with each test record. (See 
> https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161)
>  
> However, the behavior deviates from a real Kafka broker. With a real broker, 
> if we do not send an event, it uses the wall clock time of the broker 
> instead. The behavior under test should be the same: 
> `driver.advanceWallClockTime` should provide the default time to be used for 
> `TestTopic.pipeInput`, when no other time is specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10575:

Labels: kip  (was: )

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 3.5.0
>
>
> Part of KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility]
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10575:

Description: 
Part of KIP-869: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility]

Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete the 
restoration of an active task and transit it to the running state. However the 
restoration can also be stopped when the restoring task gets closed (because it 
gets migrated to another client, for example). We should also trigger the 
callback indicating its progress when the restoration stopped in any scenarios.

  was:Today we only trigger `StateRestoreListener#onRestoreEnd` when we 
complete the restoration of an active task and transit it to the running state. 
However the restoration can also be stopped when the restoring task gets closed 
(because it gets migrated to another client, for example). We should also 
trigger the callback indicating its progress when the restoration stopped in 
any scenarios.


> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.5.0
>
>
> Part of KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility]
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10575.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.5.0
>
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10575:
---

Assignee: Guozhang Wang  (was: highluck)

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13744: MINOR: update 3.4.1 licence

2023-05-23 Thread via GitHub


divijvaidya commented on PR #13744:
URL: https://github.com/apache/kafka/pull/13744#issuecomment-1559041622

   Hey @showuon 
   I know that it is unrelated to your specific change but we seem to be 
missing adding license for a bunch of dependencies such as `bcpkix` and `jaxb`. 
Shall we do an audit on whether we have listed all dependencies here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on a diff in pull request #13744: MINOR: update 3.4.1 licence

2023-05-23 Thread via GitHub


machi1990 commented on code in PR #13744:
URL: https://github.com/apache/kafka/pull/13744#discussion_r1202034297


##
LICENSE-binary:
##
@@ -205,53 +205,54 @@
 This project bundles some components that are also licensed under the Apache
 License Version 2.0:
 
-audience-annotations-0.5.0
+audience-annotations-0.13.0
 commons-cli-1.4
 commons-lang3-3.8.1
-jackson-annotations-2.13.4
-jackson-core-2.13.4
-jackson-databind-2.13.4.2
-jackson-dataformat-csv-2.13.4
-jackson-dataformat-yaml-2.13.4
-jackson-datatype-jdk8-2.13.4
-jackson-datatype-jsr310-2.13.4
-jackson-jaxrs-base-2.13.4
-jackson-jaxrs-json-provider-2.13.4
-jackson-module-jaxb-annotations-2.13.4
-jackson-module-scala_2.13-2.13.4
-jackson-module-scala_2.12-2.13.4
+jackson-annotations-2.13.5
+jackson-core-2.13.5
+jackson-databind-2.13.5
+jackson-dataformat-csv-2.13.5
+jackson-dataformat-yaml-2.13.5
+jackson-datatype-jdk8-2.13.5
+jackson-datatype-jsr310-2.13.5
+jackson-jaxrs-base-2.13.5
+jackson-jaxrs-json-provider-2.13.5
+jackson-module-jaxb-annotations-2.13.5
+jackson-module-scala_2.13-2.13.5
+jackson-module-scala_2.12-2.13.5
 jakarta.validation-api-2.0.2
-javassist-3.27.0-GA
-jetty-client-9.4.48.v20220622
-jetty-continuation-9.4.48.v20220622
-jetty-http-9.4.48.v20220622
-jetty-io-9.4.48.v20220622
-jetty-security-9.4.48.v20220622
-jetty-server-9.4.48.v20220622
-jetty-servlet-9.4.48.v20220622
-jetty-servlets-9.4.48.v20220622
-jetty-util-9.4.48.v20220622
-jetty-util-ajax-9.4.48.v20220622
-jersey-common-2.34
-jersey-server-2.34
+javassist-3.29.2-GA
+jetty-client-9.4.51.v20230217
+jetty-continuation-9.4.51.v20230217
+jetty-http-9.4.51.v20230217
+jetty-io-9.4.51.v20230217
+jetty-security-9.4.51.v20230217
+jetty-server-9.4.51.v20230217
+jetty-servlet-9.4.51.v20230217
+jetty-servlets-9.4.51.v20230217
+jetty-util-9.4.51.v20230217
+jetty-util-ajax-9.4.51.v20230217
+jersey-common-2.39.1
+jersey-server-2.39.1
 jose4j-0.9.3
 lz4-java-1.8.0
 maven-artifact-3.8.4
 metrics-core-4.1.12.1
 metrics-core-2.2.0
-netty-buffer-4.1.78.Final
-netty-codec-4.1.78.Final
-netty-common-4.1.78.Final
-netty-handler-4.1.78.Final
-netty-resolver-4.1.78.Final
-netty-transport-4.1.78.Final
-netty-transport-classes-epoll-4.1.78.Final
-netty-transport-native-epoll-4.1.78.Final
-netty-transport-native-unix-common-4.1.78.Final
+netty-buffer-4.1.92.Final
+netty-codec-4.1.92.Final
+netty-common-4.1.92.Final
+netty-handler-4.1.92.Final
+netty-resolver-4.1.92.Final
+netty-transport-4.1.92.Final
+netty-transport-classes-epoll-4.1.92.Final
+netty-transport-native-epoll-4.1.92.Final
+netty-transport-native-unix-common-4.1.92.Final
 plexus-utils-3.3.0
 reload4j-1.2.19
+reload4j-1.2.25

Review Comment:
   ah right. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13744: MINOR: update 3.4.1 licence

2023-05-23 Thread via GitHub


showuon commented on code in PR #13744:
URL: https://github.com/apache/kafka/pull/13744#discussion_r1202033613


##
LICENSE-binary:
##
@@ -205,53 +205,54 @@
 This project bundles some components that are also licensed under the Apache
 License Version 2.0:
 
-audience-annotations-0.5.0
+audience-annotations-0.13.0
 commons-cli-1.4
 commons-lang3-3.8.1
-jackson-annotations-2.13.4
-jackson-core-2.13.4
-jackson-databind-2.13.4.2
-jackson-dataformat-csv-2.13.4
-jackson-dataformat-yaml-2.13.4
-jackson-datatype-jdk8-2.13.4
-jackson-datatype-jsr310-2.13.4
-jackson-jaxrs-base-2.13.4
-jackson-jaxrs-json-provider-2.13.4
-jackson-module-jaxb-annotations-2.13.4
-jackson-module-scala_2.13-2.13.4
-jackson-module-scala_2.12-2.13.4
+jackson-annotations-2.13.5
+jackson-core-2.13.5
+jackson-databind-2.13.5
+jackson-dataformat-csv-2.13.5
+jackson-dataformat-yaml-2.13.5
+jackson-datatype-jdk8-2.13.5
+jackson-datatype-jsr310-2.13.5
+jackson-jaxrs-base-2.13.5
+jackson-jaxrs-json-provider-2.13.5
+jackson-module-jaxb-annotations-2.13.5
+jackson-module-scala_2.13-2.13.5
+jackson-module-scala_2.12-2.13.5
 jakarta.validation-api-2.0.2
-javassist-3.27.0-GA
-jetty-client-9.4.48.v20220622
-jetty-continuation-9.4.48.v20220622
-jetty-http-9.4.48.v20220622
-jetty-io-9.4.48.v20220622
-jetty-security-9.4.48.v20220622
-jetty-server-9.4.48.v20220622
-jetty-servlet-9.4.48.v20220622
-jetty-servlets-9.4.48.v20220622
-jetty-util-9.4.48.v20220622
-jetty-util-ajax-9.4.48.v20220622
-jersey-common-2.34
-jersey-server-2.34
+javassist-3.29.2-GA
+jetty-client-9.4.51.v20230217
+jetty-continuation-9.4.51.v20230217
+jetty-http-9.4.51.v20230217
+jetty-io-9.4.51.v20230217
+jetty-security-9.4.51.v20230217
+jetty-server-9.4.51.v20230217
+jetty-servlet-9.4.51.v20230217
+jetty-servlets-9.4.51.v20230217
+jetty-util-9.4.51.v20230217
+jetty-util-ajax-9.4.51.v20230217
+jersey-common-2.39.1
+jersey-server-2.39.1
 jose4j-0.9.3
 lz4-java-1.8.0
 maven-artifact-3.8.4
 metrics-core-4.1.12.1
 metrics-core-2.2.0
-netty-buffer-4.1.78.Final
-netty-codec-4.1.78.Final
-netty-common-4.1.78.Final
-netty-handler-4.1.78.Final
-netty-resolver-4.1.78.Final
-netty-transport-4.1.78.Final
-netty-transport-classes-epoll-4.1.78.Final
-netty-transport-native-epoll-4.1.78.Final
-netty-transport-native-unix-common-4.1.78.Final
+netty-buffer-4.1.92.Final
+netty-codec-4.1.92.Final
+netty-common-4.1.92.Final
+netty-handler-4.1.92.Final
+netty-resolver-4.1.92.Final
+netty-transport-4.1.92.Final
+netty-transport-classes-epoll-4.1.92.Final
+netty-transport-native-epoll-4.1.92.Final
+netty-transport-native-unix-common-4.1.92.Final
 plexus-utils-3.3.0
 reload4j-1.2.19
+reload4j-1.2.25

Review Comment:
   Yes, this is a known issue: 
https://issues.apache.org/jira/browse/KAFKA-15015 . 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on a diff in pull request #13744: MINOR: update 3.4.1 licence

2023-05-23 Thread via GitHub


machi1990 commented on code in PR #13744:
URL: https://github.com/apache/kafka/pull/13744#discussion_r1202030847


##
LICENSE-binary:
##
@@ -205,53 +205,54 @@
 This project bundles some components that are also licensed under the Apache
 License Version 2.0:
 
-audience-annotations-0.5.0
+audience-annotations-0.13.0
 commons-cli-1.4
 commons-lang3-3.8.1
-jackson-annotations-2.13.4
-jackson-core-2.13.4
-jackson-databind-2.13.4.2
-jackson-dataformat-csv-2.13.4
-jackson-dataformat-yaml-2.13.4
-jackson-datatype-jdk8-2.13.4
-jackson-datatype-jsr310-2.13.4
-jackson-jaxrs-base-2.13.4
-jackson-jaxrs-json-provider-2.13.4
-jackson-module-jaxb-annotations-2.13.4
-jackson-module-scala_2.13-2.13.4
-jackson-module-scala_2.12-2.13.4
+jackson-annotations-2.13.5
+jackson-core-2.13.5
+jackson-databind-2.13.5
+jackson-dataformat-csv-2.13.5
+jackson-dataformat-yaml-2.13.5
+jackson-datatype-jdk8-2.13.5
+jackson-datatype-jsr310-2.13.5
+jackson-jaxrs-base-2.13.5
+jackson-jaxrs-json-provider-2.13.5
+jackson-module-jaxb-annotations-2.13.5
+jackson-module-scala_2.13-2.13.5
+jackson-module-scala_2.12-2.13.5
 jakarta.validation-api-2.0.2
-javassist-3.27.0-GA
-jetty-client-9.4.48.v20220622
-jetty-continuation-9.4.48.v20220622
-jetty-http-9.4.48.v20220622
-jetty-io-9.4.48.v20220622
-jetty-security-9.4.48.v20220622
-jetty-server-9.4.48.v20220622
-jetty-servlet-9.4.48.v20220622
-jetty-servlets-9.4.48.v20220622
-jetty-util-9.4.48.v20220622
-jetty-util-ajax-9.4.48.v20220622
-jersey-common-2.34
-jersey-server-2.34
+javassist-3.29.2-GA
+jetty-client-9.4.51.v20230217
+jetty-continuation-9.4.51.v20230217
+jetty-http-9.4.51.v20230217
+jetty-io-9.4.51.v20230217
+jetty-security-9.4.51.v20230217
+jetty-server-9.4.51.v20230217
+jetty-servlet-9.4.51.v20230217
+jetty-servlets-9.4.51.v20230217
+jetty-util-9.4.51.v20230217
+jetty-util-ajax-9.4.51.v20230217
+jersey-common-2.39.1
+jersey-server-2.39.1
 jose4j-0.9.3
 lz4-java-1.8.0
 maven-artifact-3.8.4
 metrics-core-4.1.12.1
 metrics-core-2.2.0
-netty-buffer-4.1.78.Final
-netty-codec-4.1.78.Final
-netty-common-4.1.78.Final
-netty-handler-4.1.78.Final
-netty-resolver-4.1.78.Final
-netty-transport-4.1.78.Final
-netty-transport-classes-epoll-4.1.78.Final
-netty-transport-native-epoll-4.1.78.Final
-netty-transport-native-unix-common-4.1.78.Final
+netty-buffer-4.1.92.Final
+netty-codec-4.1.92.Final
+netty-common-4.1.92.Final
+netty-handler-4.1.92.Final
+netty-resolver-4.1.92.Final
+netty-transport-4.1.92.Final
+netty-transport-classes-epoll-4.1.92.Final
+netty-transport-native-epoll-4.1.92.Final
+netty-transport-native-unix-common-4.1.92.Final
 plexus-utils-3.3.0
 reload4j-1.2.19
+reload4j-1.2.25

Review Comment:
   duplicate dependency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-05-23 Thread Danica Fine (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danica Fine reassigned KAFKA-14539:
---

Assignee: Danica Fine

> Simplify StreamsMetadataState by replacing the Cluster metadata with 
> partition info map
> ---
>
> Key: KAFKA-14539
> URL: https://issues.apache.org/jira/browse/KAFKA-14539
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Danica Fine
>Priority: Major
>
> We can clean up the StreamsMetadataState class a bit by removing the 
> #onChange invocation that currently occurs within 
> StreamsPartitionAssignor#assign, which then lets us remove the `Cluster` 
> parameter in that callback. Instead of building a fake Cluster object from 
> the map of partition info when we invoke #onChange inside the 
> StreamsPartitionAssignor#onAssignment method, we can just directly pass in 
> the  `Map` and replace the usage of `Cluster` 
> everywhere in StreamsMetadataState
> (I believe the current system is a historical artifact from when we used to 
> require passing in a {{Cluster}} for the default partitioning strategy, which 
> the StreamMetadataState needs to compute the partition for a key. At some 
> point in the past we provided a better way to get the default partition, so 
> we no longer need a {{Cluster}} parameter/field at all)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13744: MINOR: update 3.4.1 licence

2023-05-23 Thread via GitHub


showuon commented on PR #13744:
URL: https://github.com/apache/kafka/pull/13744#issuecomment-1559011060

   @mimaison , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13744: MINOR: update 3.4.1 licence

2023-05-23 Thread via GitHub


showuon opened a new pull request, #13744:
URL: https://github.com/apache/kafka/pull/13744

   update v3.4.1 licence
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13708: KAFKA-14500; [4/N] Add Timer interface

2023-05-23 Thread via GitHub


dajac merged PR #13708:
URL: https://github.com/apache/kafka/pull/13708


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison closed pull request #13581: A document on the current usage of Kafka project is added

2023-05-23 Thread via GitHub


mimaison closed pull request #13581: A document on the current usage of Kafka 
project is added
URL: https://github.com/apache/kafka/pull/13581


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13581: A document on the current usage of Kafka project is added

2023-05-23 Thread via GitHub


mimaison commented on PR #13581:
URL: https://github.com/apache/kafka/pull/13581#issuecomment-1558820436

   Because it's opened against the 3.5 branch, this PR is currently preventing 
us to add the 3.5 branch to the Apache Kafka Jenkins job. So I'll close the PR 
temporarily while I fix the Jenkins job. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-23 Thread via GitHub


dajac merged PR #13704:
URL: https://github.com/apache/kafka/pull/13704


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15015) Binaries contain 2 versions of reload4j

2023-05-23 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725278#comment-17725278
 ] 

Mickael Maison commented on KAFKA-15015:


I don't think it's a blocker for 3.4.1 or 3.5.0 so we don't necessarily need 
the fix immediately.

> Binaries contain 2 versions of reload4j
> ---
>
> Key: KAFKA-15015
> URL: https://issues.apache.org/jira/browse/KAFKA-15015
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.4.1
>Reporter: Mickael Maison
>Priority: Major
>
> These releases ship 2 versions of reload4j:
> - reload4j-1.2.19.jar
> - reload4j-1.2.25.jar



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] akatona84 commented on pull request #13733: KAFKA-13337: fix of possible java.nio.file.AccessDeniedException during Connect plugin directory scan

2023-05-23 Thread via GitHub


akatona84 commented on PR #13733:
URL: https://github.com/apache/kafka/pull/13733#issuecomment-1558629671

   > Is there any way to decide if a specific file/dir is meant to be a plugin?
   currently the code is only checking whether it is a dir or the extension is 
zip, jar or class. Yet for an unreadable one, you can't decide anything, even 
these. Actual plugin scanning is done bit later
   
   Issues can be:
   io exceptions - reading problems
   url/path problems - bad paths were given (don't see other possible cases)
   
   It would be a breaking change to prevent Connect to start in case of errors 
like these.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-23 Thread via GitHub


dajac commented on PR #13535:
URL: https://github.com/apache/kafka/pull/13535#issuecomment-1558604752

   @satishd `testResponseIncludesLeaderEpoch` fails locally. Does it pass for 
you? It does not seem to be related to slow CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >