[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-18 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1266270587


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -301,10 +302,12 @@ public CompletableFuture joinGroup(
 topicPartitionFor(request.groupId()),
 coordinator -> coordinator.genericGroupJoin(context, request, 
responseFuture)
 ).exceptionally(exception -> {
-if (!responseFuture.isDone()) {
+if (!(exception instanceof KafkaException)) {
 log.error("Request {} hit an unexpected exception: {}",

Review Comment:
   nit: Could we say `JoinGroup request {} hit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-18 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1266269140


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1245,4 +1422,1304 @@ public static String 
consumerGroupSessionTimeoutKey(String groupId, String membe
 public static String consumerGroupRevocationTimeoutKey(String groupId, 
String memberId) {
 return "revocation-timeout-" + groupId + "-" + memberId;
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should be removed.
+removeGroup(groupId);
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = member.rebalanceTimeout() == -1 ?
+member.sessionTimeout() : member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> genericGroup.add(member, null));
+groups.put(groupId, genericGroup);
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult result = EMPTY_RESULT;
+
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+boolean isNewGroup = !groups.containsKey(groupId);
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+ 

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-18 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1266267667


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -531,7 +531,12 @@ class BrokerServer(
 config.consumerGroupHeartbeatIntervalMs,
 config.consumerGroupMaxSize,
 config.consumerGroupAssignors,
-config.offsetsTopicSegmentBytes
+config.offsetsTopicSegmentBytes,
+config.groupMaxSize,
+config.groupInitialRebalanceDelay,
+5 * 60 * 1000,

Review Comment:
   Should we replace this by a constant if we can't change it based on config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1266264686


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1245,4 +1422,1304 @@ public static String 
consumerGroupSessionTimeoutKey(String groupId, String membe
 public static String consumerGroupRevocationTimeoutKey(String groupId, 
String memberId) {
 return "revocation-timeout-" + groupId + "-" + memberId;
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should be removed.
+removeGroup(groupId);
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = member.rebalanceTimeout() == -1 ?
+member.sessionTimeout() : member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> genericGroup.add(member, null));
+groups.put(groupId, genericGroup);
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult result = EMPTY_RESULT;
+
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+boolean isNewGroup = !groups.containsKey(groupId);
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+ 

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1265807612


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -3022,143 +3242,2087 @@ public void testOnLoaded() {
 
assertNotNull(context.timer.timeout(consumerGroupRevocationTimeoutKey("foo", 
"foo-1")));
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
-
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
-}
-}
+@Test
+public void testGenerateRecordsOnNewGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupJoin(request, responseFuture, true);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
responseFuture.get().errorCode());
 
-if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions(
-return false;
+GenericGroup group = context.createGenericGroup("group-id");
 
-return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 
fromAssignment(actual.assignedTopicPartitions()));
+assertEquals(
+
Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, 
MetadataVersion.latest())),
+result.records()
+);
 }
 
-private Map> fromAssignment(
-List assignment
-) {
-if (assignment == null) return null;
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
+context.createGenericGroup("group-id");
 
-Map> assignmentMap = new HashMap<>();
-assignment.forEach(topicPartitions -> {
-assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+IntStream.range(0, 10).forEach(i -> {
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupJoin(request, responseFuture);
+assertFalse(responseFuture.isDone());
+assertTrue(result.records().isEmpty());
 });
-return assignmentMap;
+
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupJoin(request, responseFuture);
+assertTrue(result.records().isEmpty());
+assertTrue(responseFuture.isDone());
+

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1265807220


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -299,10 +299,10 @@ public CompletableFuture joinGroup(
 topicPartitionFor(request.groupId()),
 coordinator -> coordinator.genericGroupJoin(context, request, 
responseFuture)
 ).exceptionally(exception -> {
-log.error("Request {} hit an unexpected exception: {}",
-request, exception.getMessage());
-
 if (!responseFuture.isDone()) {
+log.error("Request {} hit an unexpected exception: {}",

Review Comment:
   This would still log in expected cases, no? For instance, when the 
coordinator for the group is inactive, loading, etc. If you really want to log 
something, you could perhaps log only if `exception` is not a KafkaException or 
only when it is a RuntimeException for instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264968675


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264967746


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -266,9 +282,32 @@ public CompletableFuture joinGroup(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+if (!isGroupIdNotEmpty(request.groupId())) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(request.memberId())
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+return responseFuture;
+}
+
+runtime.scheduleWriteOperation("generic-group-join",
+topicPartitionFor(request.groupId()),
+coordinator -> coordinator.genericGroupJoin(context, request, 
responseFuture)
+).exceptionally(exception -> {
+log.error("Request {} hit an unexpected exception: {}",
+request, exception.getMessage());

Review Comment:
   This was not addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264965443


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -561,7 +625,7 @@ public boolean hasReceivedSyncFromAllMembers() {
  * @return members that have yet to sync.
  */
 public Set allPendingSyncMembers() {
-return pendingSyncMembers;
+return new HashSet<>(pendingSyncMembers);

Review Comment:
   This was not addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264964347


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -579,4 +618,32 @@ public void shutdown() {
 Utils.closeQuietly(runtime, "coordinator runtime");
 log.info("Shutdown complete.");
 }
+
+private static boolean isGroupIdNotEmpty(String groupId) {
+return groupId != null && !groupId.isEmpty();
+}
+
+private static Errors toResponseError(Errors appendError) {
+switch (appendError) {
+case UNKNOWN_TOPIC_OR_PARTITION:
+case NOT_ENOUGH_REPLICAS:
+case NOT_ENOUGH_REPLICAS_AFTER_APPEND:
+return COORDINATOR_NOT_AVAILABLE;
+
+case NOT_LEADER_OR_FOLLOWER:
+case KAFKA_STORAGE_ERROR:
+return NOT_COORDINATOR;
+
+case REQUEST_TIMED_OUT:

Review Comment:
   This request timeout was coming from the delayed produce op in the 
purgatory. We don't have this anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264962406


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5235,6 +5316,11 @@ private List verifyGenericGroupJoinResponses(
 return memberIds;
 }
 
+private void assertEmptyResult(List> 
timeouts) {

Review Comment:
   nit: `assertNoOrEmptyResult`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264961072


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -3022,143 +3242,2087 @@ public void testOnLoaded() {
 
assertNotNull(context.timer.timeout(consumerGroupRevocationTimeoutKey("foo", 
"foo-1")));
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
-
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
-}
-}
+@Test
+public void testGenerateRecordsOnNewGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupJoin(request, responseFuture, true);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
responseFuture.get().errorCode());
 
-if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions(
-return false;
+GenericGroup group = context.createGenericGroup("group-id");
 
-return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 
fromAssignment(actual.assignedTopicPartitions()));
+assertEquals(
+
Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, 
MetadataVersion.latest())),
+result.records()
+);
 }
 
-private Map> fromAssignment(
-List assignment
-) {
-if (assignment == null) return null;
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
+context.createGenericGroup("group-id");
 
-Map> assignmentMap = new HashMap<>();
-assignment.forEach(topicPartitions -> {
-assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+IntStream.range(0, 10).forEach(i -> {
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupJoin(request, responseFuture);
+assertFalse(responseFuture.isDone());
+assertTrue(result.records().isEmpty());
 });
-return assignmentMap;
+
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupJoin(request, responseFuture);
+assertTrue(result.records().isEmpty());
+assertTrue(responseFuture.isDone());
+

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264955867


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1243,4 +1406,1283 @@ public static String 
consumerGroupSessionTimeoutKey(String groupId, String membe
 public static String consumerGroupRevocationTimeoutKey(String groupId, 
String memberId) {
 return "revocation-timeout-" + groupId + "-" + memberId;
 }
+
+ /** Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should be removed.
+groups.remove(groupId);
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = member.rebalanceTimeout() == -1 ?
+member.sessionTimeout() : member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult result = EMPTY_RESULT;
+
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+boolean isNewGroup = !groups.containsKey(groupId);
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+if (!acceptJoiningMember(group, 

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264954924


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1243,4 +1406,1283 @@ public static String 
consumerGroupSessionTimeoutKey(String groupId, String membe
 public static String consumerGroupRevocationTimeoutKey(String groupId, 
String memberId) {
 return "revocation-timeout-" + groupId + "-" + memberId;
 }
+
+ /** Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should be removed.
+groups.remove(groupId);
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = member.rebalanceTimeout() == -1 ?
+member.sessionTimeout() : member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult result = EMPTY_RESULT;
+
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+boolean isNewGroup = !groups.containsKey(groupId);
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+if (!acceptJoiningMember(group, 

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-17 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264953365


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -247,31 +324,78 @@ GroupMetadataManager build() {
  */
 private MetadataImage metadataImage;
 
+/**
+ * An empty result returned to the state machine. This means that
+ * there are no records to append to the log.
+ *
+ * Package private for testing.
+ */
+static final CoordinatorResult EMPTY_RESULT =
+new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));

Review Comment:
   ah.. did not think about that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-30 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1247814090


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -266,9 +295,21 @@ public CompletableFuture joinGroup(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+if (!isValidGroupId(request.groupId(), 
ApiKeys.forId(request.apiKey( {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(request.memberId())
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+return responseFuture;
+}
+
+runtime.scheduleWriteOperation("generic-group-join",

Review Comment:
   I wonder if we need to handle the future returned by 
`scheduleWriteOperation` as well. At minimum, we may want to react to errors. 
This could for instance happen if something goes wrong before the join group 
handling is event triggered.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -578,4 +619,19 @@ public void shutdown() {
 Utils.closeQuietly(runtime, "coordinator runtime");
 log.info("Shutdown complete.");
 }
+
+private boolean isValidGroupId(String groupId, ApiKeys api) {
+if (api == ApiKeys.OFFSET_COMMIT ||
+api == ApiKeys.OFFSET_FETCH ||
+api == ApiKeys.DESCRIBE_GROUPS ||
+api == ApiKeys.DELETE_GROUPS

Review Comment:
   nit: I am not a fan of this validation. I wonder if we should just have two 
helpers: `isGroupIdNotNull` and `isGroupIdNotEmpty`. In this PR, we would only 
need `isGroupIdNotEmpty`. What do you think?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1087,1348 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value,
+short version
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should not be added.
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = version == 0 ? member.sessionTimeout() 
: member.rebalanceTimeout();

Review Comment:
   I wonder if we could avoid passing the version to this method by adding `-1` 
as the default value of `rebalanceTimeout` in `GroupMetadataValue`. It seems 
that we could rely on this to decide here.
   
   Another way that I was thinking about would be to pass the `Record` to the 
replay method as it contains all the available information. Have you considered 
this?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1087,1348 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value,
+short version
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should not be added.

Review Comment:
   I think that the group should be deleted in this case.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -266,9 +295,21 @@ public CompletableFuture joinGroup(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+if (!isValidGroupId(request.groupId(), 
ApiKeys.forId(request.apiKey( {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(request.memberId())
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+return responseFuture;
+}
+
+runtime.scheduleGenericGroupOperation("generic-group-join",

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-29 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1246778958


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1072,1338 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value,
+short version
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should not be added.
+// TODO: this needs to be checked in conjunction with empty group 
offsets.
+//if (groups.containsKey(groupId)) {
+//throw new IllegalStateException("Unexpected unload of active 
group " + groupId +
+//"while loading partition " + topicPartition);
+//}
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = version == 0 ? member.sessionTimeout() 
: member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return 

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-27 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1243806647


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1072,1338 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value,
+short version
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should not be added.
+// TODO: this needs to be checked in conjunction with empty group 
offsets.
+//if (groups.containsKey(groupId)) {
+//throw new IllegalStateException("Unexpected unload of active 
group " + groupId +
+//"while loading partition " + topicPartition);
+//}
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = version == 0 ? member.sessionTimeout() 
: member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return 

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-22 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1238725344


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -266,9 +295,21 @@ public CompletableFuture joinGroup(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+if (!isValidGroupId(request.groupId(), 
ApiKeys.forId(request.apiKey( {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(request.memberId())
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+return responseFuture;
+}
+
+runtime.scheduleGenericGroupOperation("generic-group-join",
+topicPartitionFor(request.groupId()),
+coordinator -> coordinator.genericGroupJoin(context, request, 
responseFuture));
+
+return responseFuture;

Review Comment:
   We probably need to convert some of the exceptions like I did for the 
consumer group heartbeat request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-22 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1238418966


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -171,70 +260,152 @@ GroupMetadataManager build() {
 /**
  * The maximum number of members allowed in a single consumer group.
  */
-private final int consumerGroupMaxSize;
+private final int groupMaxSize;
 
 /**
  * The heartbeat interval for consumer groups.
  */
 private final int consumerGroupHeartbeatIntervalMs;
 
 /**
- * The topics metadata (or image).
+ * The metadata image.
+ */
+private MetadataImage metadataImage;
+
+// Rest of the fields are used for the generic group APIs.
+
+/**
+ * An empty result returned to the state machine. This means that
+ * there are no records to append to the log.
+ *
+ * Package private for testing.
+ */
+static final CoordinatorResult, Record> 
EMPTY_RESULT =
+new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));
+
+/**
+ * Initial rebalance delay for members joining a generic group.
+ */
+private final int initialRebalanceDelayMs;
+
+/**
+ * The timeout used to wait for a new member in milliseconds.
+ */
+private final int newMemberJoinTimeoutMs;
+
+/**
+ * The group minimum session timeout.
+ */
+private final int groupMinSessionTimeoutMs;
+
+/**
+ * The group maximum session timeout.
+ */
+private final int groupMaxSessionTimeoutMs;
+
+/**
+ * The timer to add and cancel group operations.
  */
-private TopicsImage topicsImage;
+private final Timer, Record> timer;
+
+/**
+ * The time.
+ */
+private final Time time;
 
 private GroupMetadataManager(
 SnapshotRegistry snapshotRegistry,
 LogContext logContext,
 List assignors,
-TopicsImage topicsImage,
-int consumerGroupMaxSize,
-int consumerGroupHeartbeatIntervalMs
+MetadataImage metadataImage,
+TopicPartition topicPartition,
+int groupMaxSize,
+int consumerGroupHeartbeatIntervalMs,
+int initialRebalanceDelayMs,
+int newMemberJoinTimeoutMs,
+int groupMinSessionTimeoutMs,
+int groupMaxSessionTimeoutMs,
+Timer, Record> timer,
+Time time
 ) {
+this.logContext = logContext;
 this.log = logContext.logger(GroupMetadataManager.class);
 this.snapshotRegistry = snapshotRegistry;
-this.topicsImage = topicsImage;
+this.metadataImage = metadataImage;
 this.assignors = 
assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, 
Function.identity()));
+this.topicPartition = topicPartition;
 this.defaultAssignor = assignors.get(0);
 this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
-this.consumerGroupMaxSize = consumerGroupMaxSize;
+this.groupMaxSize = groupMaxSize;
 this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
+this.initialRebalanceDelayMs = initialRebalanceDelayMs;
+this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs;
+this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs;
+this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs;
+this.timer = timer;
+this.time = time;
+}
+
+/**
+ * When a new metadata image is pushed.
+ *
+ * @param metadataImage The new metadata image.
+ */
+public void onNewMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
 }
 
 /**
  * Gets or maybe creates a consumer group.
  *
  * @param groupId   The group id.
+ * @param groupType The group type (generic or consumer).
  * @param createIfNotExists A boolean indicating whether the group should 
be
  *  created if it does not exist.
  *
  * @return A ConsumerGroup.
+ * @throws InvalidGroupIdException  if the group id is invalid.
  * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
  *  if the group is not a consumer group.
  *
  * Package private for testing.
  */
-ConsumerGroup getOrMaybeCreateConsumerGroup(
+// Package private for testing.
+Group getOrMaybeCreateGroup(
 String groupId,
+Group.GroupType groupType,
 boolean createIfNotExists
-) throws GroupIdNotFoundException {
+) throws InvalidGroupIdException, GroupIdNotFoundException {
+if (groupId == null || groupId.isEmpty()) {
+throw new InvalidGroupIdException(String.format("Group id %s is 
invalid.", groupId));
+}

Review Comment:
   i think that static validation could be done in the group 

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-22 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1238417804


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -171,70 +260,152 @@ GroupMetadataManager build() {
 /**
  * The maximum number of members allowed in a single consumer group.
  */
-private final int consumerGroupMaxSize;
+private final int groupMaxSize;
 
 /**
  * The heartbeat interval for consumer groups.
  */
 private final int consumerGroupHeartbeatIntervalMs;
 
 /**
- * The topics metadata (or image).
+ * The metadata image.
+ */
+private MetadataImage metadataImage;
+
+// Rest of the fields are used for the generic group APIs.
+
+/**
+ * An empty result returned to the state machine. This means that
+ * there are no records to append to the log.
+ *
+ * Package private for testing.
+ */
+static final CoordinatorResult, Record> 
EMPTY_RESULT =
+new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));
+
+/**
+ * Initial rebalance delay for members joining a generic group.
+ */
+private final int initialRebalanceDelayMs;
+
+/**
+ * The timeout used to wait for a new member in milliseconds.
+ */
+private final int newMemberJoinTimeoutMs;
+
+/**
+ * The group minimum session timeout.
+ */
+private final int groupMinSessionTimeoutMs;
+
+/**
+ * The group maximum session timeout.
+ */
+private final int groupMaxSessionTimeoutMs;
+
+/**
+ * The timer to add and cancel group operations.
  */
-private TopicsImage topicsImage;
+private final Timer, Record> timer;
+
+/**
+ * The time.
+ */
+private final Time time;
 
 private GroupMetadataManager(
 SnapshotRegistry snapshotRegistry,
 LogContext logContext,
 List assignors,
-TopicsImage topicsImage,
-int consumerGroupMaxSize,
-int consumerGroupHeartbeatIntervalMs
+MetadataImage metadataImage,
+TopicPartition topicPartition,
+int groupMaxSize,
+int consumerGroupHeartbeatIntervalMs,
+int initialRebalanceDelayMs,
+int newMemberJoinTimeoutMs,
+int groupMinSessionTimeoutMs,
+int groupMaxSessionTimeoutMs,
+Timer, Record> timer,
+Time time
 ) {
+this.logContext = logContext;
 this.log = logContext.logger(GroupMetadataManager.class);
 this.snapshotRegistry = snapshotRegistry;
-this.topicsImage = topicsImage;
+this.metadataImage = metadataImage;
 this.assignors = 
assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, 
Function.identity()));
+this.topicPartition = topicPartition;
 this.defaultAssignor = assignors.get(0);
 this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
-this.consumerGroupMaxSize = consumerGroupMaxSize;
+this.groupMaxSize = groupMaxSize;
 this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
+this.initialRebalanceDelayMs = initialRebalanceDelayMs;
+this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs;
+this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs;
+this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs;
+this.timer = timer;
+this.time = time;
+}
+
+/**
+ * When a new metadata image is pushed.
+ *
+ * @param metadataImage The new metadata image.
+ */
+public void onNewMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
 }
 
 /**
  * Gets or maybe creates a consumer group.
  *
  * @param groupId   The group id.
+ * @param groupType The group type (generic or consumer).
  * @param createIfNotExists A boolean indicating whether the group should 
be
  *  created if it does not exist.
  *
  * @return A ConsumerGroup.
+ * @throws InvalidGroupIdException  if the group id is invalid.
  * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
  *  if the group is not a consumer group.
  *
  * Package private for testing.
  */
-ConsumerGroup getOrMaybeCreateConsumerGroup(
+// Package private for testing.
+Group getOrMaybeCreateGroup(

Review Comment:
   i am not convinced. the downside is that it will be harder to guarantee to 
uniqueness of the group id. it also means that we would have to check both maps 
for all other operations (e.g. list, delete, etc.). i think that it would be 
better to keep them in a single map.
   
   for this particular case, we could just have two methods:  
`getOrMaybeCreateConsumerGroup` and `getOrMaybeCreateGenericGroup`.



-- 
This is an