[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1266270587 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -301,10 +302,12 @@ public CompletableFuture joinGroup( topicPartitionFor(request.groupId()), coordinator -> coordinator.genericGroupJoin(context, request, responseFuture) ).exceptionally(exception -> { -if (!responseFuture.isDone()) { +if (!(exception instanceof KafkaException)) { log.error("Request {} hit an unexpected exception: {}", Review Comment: nit: Could we say `JoinGroup request {} hit`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1266269140 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1245,4 +1422,1304 @@ public static String consumerGroupSessionTimeoutKey(String groupId, String membe public static String consumerGroupRevocationTimeoutKey(String groupId, String memberId) { return "revocation-timeout-" + groupId + "-" + memberId; } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should be removed. +removeGroup(groupId); +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = member.rebalanceTimeout() == -1 ? +member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> genericGroup.add(member, null)); +groups.put(groupId, genericGroup); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult result = EMPTY_RESULT; + +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +boolean isNewGroup = !groups.containsKey(groupId); +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return EMPTY_RESULT; +} + +if (!acceptJoiningMember(group, memberId)) { +group.remove(memberId); +responseFuture.complete(new JoinGroupResponseData() +
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1266267667 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -531,7 +531,12 @@ class BrokerServer( config.consumerGroupHeartbeatIntervalMs, config.consumerGroupMaxSize, config.consumerGroupAssignors, -config.offsetsTopicSegmentBytes +config.offsetsTopicSegmentBytes, +config.groupMaxSize, +config.groupInitialRebalanceDelay, +5 * 60 * 1000, Review Comment: Should we replace this by a constant if we can't change it based on config? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1266264686 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1245,4 +1422,1304 @@ public static String consumerGroupSessionTimeoutKey(String groupId, String membe public static String consumerGroupRevocationTimeoutKey(String groupId, String memberId) { return "revocation-timeout-" + groupId + "-" + memberId; } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should be removed. +removeGroup(groupId); +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = member.rebalanceTimeout() == -1 ? +member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> genericGroup.add(member, null)); +groups.put(groupId, genericGroup); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult result = EMPTY_RESULT; + +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +boolean isNewGroup = !groups.containsKey(groupId); +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return EMPTY_RESULT; +} + +if (!acceptJoiningMember(group, memberId)) { +group.remove(memberId); +responseFuture.complete(new JoinGroupResponseData() +
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1265807612 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -3022,143 +3242,2087 @@ public void testOnLoaded() { assertNotNull(context.timer.timeout(consumerGroupRevocationTimeoutKey("foo", "foo-1"))); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} - -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); -} -} +@Test +public void testGenerateRecordsOnNewGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +CompletableFuture responseFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupJoin(request, responseFuture, true); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.MEMBER_ID_REQUIRED.code(), responseFuture.get().errorCode()); -if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), fromAssignment(actual.pendingTopicPartitions( -return false; +GenericGroup group = context.createGenericGroup("group-id"); -return Objects.equals(fromAssignment(expected.assignedTopicPartitions()), fromAssignment(actual.assignedTopicPartitions())); +assertEquals( + Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, MetadataVersion.latest())), +result.records() +); } -private Map> fromAssignment( -List assignment -) { -if (assignment == null) return null; +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); +context.createGenericGroup("group-id"); -Map> assignmentMap = new HashMap<>(); -assignment.forEach(topicPartitions -> { -assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions())); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +IntStream.range(0, 10).forEach(i -> { +CompletableFuture responseFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupJoin(request, responseFuture); +assertFalse(responseFuture.isDone()); +assertTrue(result.records().isEmpty()); }); -return assignmentMap; + +CompletableFuture responseFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupJoin(request, responseFuture); +assertTrue(result.records().isEmpty()); +assertTrue(responseFuture.isDone()); +
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1265807220 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -299,10 +299,10 @@ public CompletableFuture joinGroup( topicPartitionFor(request.groupId()), coordinator -> coordinator.genericGroupJoin(context, request, responseFuture) ).exceptionally(exception -> { -log.error("Request {} hit an unexpected exception: {}", -request, exception.getMessage()); - if (!responseFuture.isDone()) { +log.error("Request {} hit an unexpected exception: {}", Review Comment: This would still log in expected cases, no? For instance, when the coordinator for the group is inactive, loading, etc. If you really want to log something, you could perhaps log only if `exception` is not a KafkaException or only when it is a RuntimeException for instance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264968675 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264967746 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -266,9 +282,32 @@ public CompletableFuture joinGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +CompletableFuture responseFuture = new CompletableFuture<>(); + +if (!isGroupIdNotEmpty(request.groupId())) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(request.memberId()) +.setErrorCode(Errors.INVALID_GROUP_ID.code())); + +return responseFuture; +} + +runtime.scheduleWriteOperation("generic-group-join", +topicPartitionFor(request.groupId()), +coordinator -> coordinator.genericGroupJoin(context, request, responseFuture) +).exceptionally(exception -> { +log.error("Request {} hit an unexpected exception: {}", +request, exception.getMessage()); Review Comment: This was not addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264965443 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -561,7 +625,7 @@ public boolean hasReceivedSyncFromAllMembers() { * @return members that have yet to sync. */ public Set allPendingSyncMembers() { -return pendingSyncMembers; +return new HashSet<>(pendingSyncMembers); Review Comment: This was not addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264964347 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -579,4 +618,32 @@ public void shutdown() { Utils.closeQuietly(runtime, "coordinator runtime"); log.info("Shutdown complete."); } + +private static boolean isGroupIdNotEmpty(String groupId) { +return groupId != null && !groupId.isEmpty(); +} + +private static Errors toResponseError(Errors appendError) { +switch (appendError) { +case UNKNOWN_TOPIC_OR_PARTITION: +case NOT_ENOUGH_REPLICAS: +case NOT_ENOUGH_REPLICAS_AFTER_APPEND: +return COORDINATOR_NOT_AVAILABLE; + +case NOT_LEADER_OR_FOLLOWER: +case KAFKA_STORAGE_ERROR: +return NOT_COORDINATOR; + +case REQUEST_TIMED_OUT: Review Comment: This request timeout was coming from the delayed produce op in the purgatory. We don't have this anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264962406 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5235,6 +5316,11 @@ private List verifyGenericGroupJoinResponses( return memberIds; } +private void assertEmptyResult(List> timeouts) { Review Comment: nit: `assertNoOrEmptyResult`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264961072 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -3022,143 +3242,2087 @@ public void testOnLoaded() { assertNotNull(context.timer.timeout(consumerGroupRevocationTimeoutKey("foo", "foo-1"))); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} - -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); -} -} +@Test +public void testGenerateRecordsOnNewGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +CompletableFuture responseFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupJoin(request, responseFuture, true); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.MEMBER_ID_REQUIRED.code(), responseFuture.get().errorCode()); -if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), fromAssignment(actual.pendingTopicPartitions( -return false; +GenericGroup group = context.createGenericGroup("group-id"); -return Objects.equals(fromAssignment(expected.assignedTopicPartitions()), fromAssignment(actual.assignedTopicPartitions())); +assertEquals( + Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, MetadataVersion.latest())), +result.records() +); } -private Map> fromAssignment( -List assignment -) { -if (assignment == null) return null; +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); +context.createGenericGroup("group-id"); -Map> assignmentMap = new HashMap<>(); -assignment.forEach(topicPartitions -> { -assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions())); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +IntStream.range(0, 10).forEach(i -> { +CompletableFuture responseFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupJoin(request, responseFuture); +assertFalse(responseFuture.isDone()); +assertTrue(result.records().isEmpty()); }); -return assignmentMap; + +CompletableFuture responseFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupJoin(request, responseFuture); +assertTrue(result.records().isEmpty()); +assertTrue(responseFuture.isDone()); +
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264955867 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1243,4 +1406,1283 @@ public static String consumerGroupSessionTimeoutKey(String groupId, String membe public static String consumerGroupRevocationTimeoutKey(String groupId, String memberId) { return "revocation-timeout-" + groupId + "-" + memberId; } + + /** Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should be removed. +groups.remove(groupId); +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = member.rebalanceTimeout() == -1 ? +member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> { +genericGroup.add(member, null); +log.info("Loaded member {} in group {} with generation {}.", +member.memberId(), groupId, genericGroup.generationId()); +}); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult result = EMPTY_RESULT; + +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +boolean isNewGroup = !groups.containsKey(groupId); +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return EMPTY_RESULT; +} + +if (!acceptJoiningMember(group,
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264954924 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1243,4 +1406,1283 @@ public static String consumerGroupSessionTimeoutKey(String groupId, String membe public static String consumerGroupRevocationTimeoutKey(String groupId, String memberId) { return "revocation-timeout-" + groupId + "-" + memberId; } + + /** Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should be removed. +groups.remove(groupId); +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = member.rebalanceTimeout() == -1 ? +member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> { +genericGroup.add(member, null); +log.info("Loaded member {} in group {} with generation {}.", +member.memberId(), groupId, genericGroup.generationId()); +}); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult result = EMPTY_RESULT; + +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +boolean isNewGroup = !groups.containsKey(groupId); +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return EMPTY_RESULT; +} + +if (!acceptJoiningMember(group,
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264953365 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -247,31 +324,78 @@ GroupMetadataManager build() { */ private MetadataImage metadataImage; +/** + * An empty result returned to the state machine. This means that + * there are no records to append to the log. + * + * Package private for testing. + */ +static final CoordinatorResult EMPTY_RESULT = +new CoordinatorResult<>(Collections.emptyList(), CompletableFuture.completedFuture(null)); Review Comment: ah.. did not think about that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1247814090 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -266,9 +295,21 @@ public CompletableFuture joinGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +CompletableFuture responseFuture = new CompletableFuture<>(); + +if (!isValidGroupId(request.groupId(), ApiKeys.forId(request.apiKey( { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(request.memberId()) +.setErrorCode(Errors.INVALID_GROUP_ID.code())); + +return responseFuture; +} + +runtime.scheduleWriteOperation("generic-group-join", Review Comment: I wonder if we need to handle the future returned by `scheduleWriteOperation` as well. At minimum, we may want to react to errors. This could for instance happen if something goes wrong before the join group handling is event triggered. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -578,4 +619,19 @@ public void shutdown() { Utils.closeQuietly(runtime, "coordinator runtime"); log.info("Shutdown complete."); } + +private boolean isValidGroupId(String groupId, ApiKeys api) { +if (api == ApiKeys.OFFSET_COMMIT || +api == ApiKeys.OFFSET_FETCH || +api == ApiKeys.DESCRIBE_GROUPS || +api == ApiKeys.DELETE_GROUPS Review Comment: nit: I am not a fan of this validation. I wonder if we should just have two helpers: `isGroupIdNotNull` and `isGroupIdNotEmpty`. In this PR, we would only need `isGroupIdNotEmpty`. What do you think? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1087,1348 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value, +short version +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should not be added. +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = version == 0 ? member.sessionTimeout() : member.rebalanceTimeout(); Review Comment: I wonder if we could avoid passing the version to this method by adding `-1` as the default value of `rebalanceTimeout` in `GroupMetadataValue`. It seems that we could rely on this to decide here. Another way that I was thinking about would be to pass the `Record` to the replay method as it contains all the available information. Have you considered this? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1087,1348 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value, +short version +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should not be added. Review Comment: I think that the group should be deleted in this case. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -266,9 +295,21 @@ public CompletableFuture joinGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +CompletableFuture responseFuture = new CompletableFuture<>(); + +if (!isValidGroupId(request.groupId(), ApiKeys.forId(request.apiKey( { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(request.memberId()) +.setErrorCode(Errors.INVALID_GROUP_ID.code())); + +return responseFuture; +} + +runtime.scheduleGenericGroupOperation("generic-group-join",
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1246778958 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1072,1338 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value, +short version +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should not be added. +// TODO: this needs to be checked in conjunction with empty group offsets. +//if (groups.containsKey(groupId)) { +//throw new IllegalStateException("Unexpected unload of active group " + groupId + +//"while loading partition " + topicPartition); +//} +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = version == 0 ? member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> { +genericGroup.add(member, null); +log.info("Loaded member {} in group {} with generation {}.", +member.memberId(), groupId, genericGroup.generationId()); +}); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult, Record> genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult, Record> result = EMPTY_RESULT; +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1243806647 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1072,1338 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value, +short version +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should not be added. +// TODO: this needs to be checked in conjunction with empty group offsets. +//if (groups.containsKey(groupId)) { +//throw new IllegalStateException("Unexpected unload of active group " + groupId + +//"while loading partition " + topicPartition); +//} +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = version == 0 ? member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> { +genericGroup.add(member, null); +log.info("Loaded member {} in group {} with generation {}.", +member.memberId(), groupId, genericGroup.generationId()); +}); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult, Record> genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult, Record> result = EMPTY_RESULT; +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1238725344 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -266,9 +295,21 @@ public CompletableFuture joinGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +CompletableFuture responseFuture = new CompletableFuture<>(); + +if (!isValidGroupId(request.groupId(), ApiKeys.forId(request.apiKey( { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(request.memberId()) +.setErrorCode(Errors.INVALID_GROUP_ID.code())); + +return responseFuture; +} + +runtime.scheduleGenericGroupOperation("generic-group-join", +topicPartitionFor(request.groupId()), +coordinator -> coordinator.genericGroupJoin(context, request, responseFuture)); + +return responseFuture; Review Comment: We probably need to convert some of the exceptions like I did for the consumer group heartbeat request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1238418966 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -171,70 +260,152 @@ GroupMetadataManager build() { /** * The maximum number of members allowed in a single consumer group. */ -private final int consumerGroupMaxSize; +private final int groupMaxSize; /** * The heartbeat interval for consumer groups. */ private final int consumerGroupHeartbeatIntervalMs; /** - * The topics metadata (or image). + * The metadata image. + */ +private MetadataImage metadataImage; + +// Rest of the fields are used for the generic group APIs. + +/** + * An empty result returned to the state machine. This means that + * there are no records to append to the log. + * + * Package private for testing. + */ +static final CoordinatorResult, Record> EMPTY_RESULT = +new CoordinatorResult<>(Collections.emptyList(), CompletableFuture.completedFuture(null)); + +/** + * Initial rebalance delay for members joining a generic group. + */ +private final int initialRebalanceDelayMs; + +/** + * The timeout used to wait for a new member in milliseconds. + */ +private final int newMemberJoinTimeoutMs; + +/** + * The group minimum session timeout. + */ +private final int groupMinSessionTimeoutMs; + +/** + * The group maximum session timeout. + */ +private final int groupMaxSessionTimeoutMs; + +/** + * The timer to add and cancel group operations. */ -private TopicsImage topicsImage; +private final Timer, Record> timer; + +/** + * The time. + */ +private final Time time; private GroupMetadataManager( SnapshotRegistry snapshotRegistry, LogContext logContext, List assignors, -TopicsImage topicsImage, -int consumerGroupMaxSize, -int consumerGroupHeartbeatIntervalMs +MetadataImage metadataImage, +TopicPartition topicPartition, +int groupMaxSize, +int consumerGroupHeartbeatIntervalMs, +int initialRebalanceDelayMs, +int newMemberJoinTimeoutMs, +int groupMinSessionTimeoutMs, +int groupMaxSessionTimeoutMs, +Timer, Record> timer, +Time time ) { +this.logContext = logContext; this.log = logContext.logger(GroupMetadataManager.class); this.snapshotRegistry = snapshotRegistry; -this.topicsImage = topicsImage; +this.metadataImage = metadataImage; this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity())); +this.topicPartition = topicPartition; this.defaultAssignor = assignors.get(0); this.groups = new TimelineHashMap<>(snapshotRegistry, 0); -this.consumerGroupMaxSize = consumerGroupMaxSize; +this.groupMaxSize = groupMaxSize; this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs; +this.initialRebalanceDelayMs = initialRebalanceDelayMs; +this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs; +this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs; +this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs; +this.timer = timer; +this.time = time; +} + +/** + * When a new metadata image is pushed. + * + * @param metadataImage The new metadata image. + */ +public void onNewMetadataImage(MetadataImage metadataImage) { +this.metadataImage = metadataImage; } /** * Gets or maybe creates a consumer group. * * @param groupId The group id. + * @param groupType The group type (generic or consumer). * @param createIfNotExists A boolean indicating whether the group should be * created if it does not exist. * * @return A ConsumerGroup. + * @throws InvalidGroupIdException if the group id is invalid. * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or * if the group is not a consumer group. * * Package private for testing. */ -ConsumerGroup getOrMaybeCreateConsumerGroup( +// Package private for testing. +Group getOrMaybeCreateGroup( String groupId, +Group.GroupType groupType, boolean createIfNotExists -) throws GroupIdNotFoundException { +) throws InvalidGroupIdException, GroupIdNotFoundException { +if (groupId == null || groupId.isEmpty()) { +throw new InvalidGroupIdException(String.format("Group id %s is invalid.", groupId)); +} Review Comment: i think that static validation could be done in the group
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1238417804 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -171,70 +260,152 @@ GroupMetadataManager build() { /** * The maximum number of members allowed in a single consumer group. */ -private final int consumerGroupMaxSize; +private final int groupMaxSize; /** * The heartbeat interval for consumer groups. */ private final int consumerGroupHeartbeatIntervalMs; /** - * The topics metadata (or image). + * The metadata image. + */ +private MetadataImage metadataImage; + +// Rest of the fields are used for the generic group APIs. + +/** + * An empty result returned to the state machine. This means that + * there are no records to append to the log. + * + * Package private for testing. + */ +static final CoordinatorResult, Record> EMPTY_RESULT = +new CoordinatorResult<>(Collections.emptyList(), CompletableFuture.completedFuture(null)); + +/** + * Initial rebalance delay for members joining a generic group. + */ +private final int initialRebalanceDelayMs; + +/** + * The timeout used to wait for a new member in milliseconds. + */ +private final int newMemberJoinTimeoutMs; + +/** + * The group minimum session timeout. + */ +private final int groupMinSessionTimeoutMs; + +/** + * The group maximum session timeout. + */ +private final int groupMaxSessionTimeoutMs; + +/** + * The timer to add and cancel group operations. */ -private TopicsImage topicsImage; +private final Timer, Record> timer; + +/** + * The time. + */ +private final Time time; private GroupMetadataManager( SnapshotRegistry snapshotRegistry, LogContext logContext, List assignors, -TopicsImage topicsImage, -int consumerGroupMaxSize, -int consumerGroupHeartbeatIntervalMs +MetadataImage metadataImage, +TopicPartition topicPartition, +int groupMaxSize, +int consumerGroupHeartbeatIntervalMs, +int initialRebalanceDelayMs, +int newMemberJoinTimeoutMs, +int groupMinSessionTimeoutMs, +int groupMaxSessionTimeoutMs, +Timer, Record> timer, +Time time ) { +this.logContext = logContext; this.log = logContext.logger(GroupMetadataManager.class); this.snapshotRegistry = snapshotRegistry; -this.topicsImage = topicsImage; +this.metadataImage = metadataImage; this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity())); +this.topicPartition = topicPartition; this.defaultAssignor = assignors.get(0); this.groups = new TimelineHashMap<>(snapshotRegistry, 0); -this.consumerGroupMaxSize = consumerGroupMaxSize; +this.groupMaxSize = groupMaxSize; this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs; +this.initialRebalanceDelayMs = initialRebalanceDelayMs; +this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs; +this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs; +this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs; +this.timer = timer; +this.time = time; +} + +/** + * When a new metadata image is pushed. + * + * @param metadataImage The new metadata image. + */ +public void onNewMetadataImage(MetadataImage metadataImage) { +this.metadataImage = metadataImage; } /** * Gets or maybe creates a consumer group. * * @param groupId The group id. + * @param groupType The group type (generic or consumer). * @param createIfNotExists A boolean indicating whether the group should be * created if it does not exist. * * @return A ConsumerGroup. + * @throws InvalidGroupIdException if the group id is invalid. * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or * if the group is not a consumer group. * * Package private for testing. */ -ConsumerGroup getOrMaybeCreateConsumerGroup( +// Package private for testing. +Group getOrMaybeCreateGroup( Review Comment: i am not convinced. the downside is that it will be harder to guarantee to uniqueness of the group id. it also means that we would have to check both maps for all other operations (e.g. list, delete, etc.). i think that it would be better to keep them in a single map. for this particular case, we could just have two methods: `getOrMaybeCreateConsumerGroup` and `getOrMaybeCreateGenericGroup`. -- This is an