dajac commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1819075308
##########
clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java:
##########
@@ -37,6 +37,14 @@ public class ConsumerGroupHeartbeatRequest extends
AbstractRequest {
*/
public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
+ /**
+ * The version from which consumers are required to generate their own
memberId.
Review Comment:
nit: Let's use `member id` here and below.
##########
clients/src/main/resources/common/message/DescribeGroupsResponse.json:
##########
@@ -48,7 +48,7 @@
{ "name": "Members", "type": "[]DescribedGroupMember", "versions": "0+",
"about": "The group members.", "fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",
- "about": "The member ID assigned by the group coordinator." },
+ "about": "The member ID is either assigned by the group coordinator
or generated by consumer itself." },
Review Comment:
nit: I would simplify it to `The member id`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1293,21 +1294,28 @@ private void throwIfNull(
* Validates the request.
*
* @param request The request to validate.
- *
+ * @param apiVersion The version of ConsumerGroupHeartbeat RPC
* @throws InvalidRequestException if the request is not valid.
* @throws UnsupportedAssignorException if the assignor is not supported.
*/
private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
- ConsumerGroupHeartbeatRequestData request
+ ConsumerGroupHeartbeatRequestData request,
+ short apiVersion
) throws InvalidRequestException, UnsupportedAssignorException {
+ if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION ||
+ request.memberEpoch() > 0 ||
+ request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH
+ ) {
+ throwIfNull(request.memberId(), "MemberId can't be null.");
Review Comment:
Can it be null? It does not seem to be possible from the RPC definition.
##########
clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json:
##########
@@ -28,7 +28,7 @@
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+",
- "about": "The member ID generated by the coordinator. The member ID must
be kept during the entire lifetime of the member." },
+ "about": "The member ID generated by the consumer. The member ID must be
kept during the entire lifetime of the consumer process." },
Review Comment:
ditto.
##########
core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala:
##########
@@ -82,15 +84,17 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
// We test DeleteGroups on empty and non-empty groups. Here we create the
non-empty group.
joinConsumerGroup(
groupId = "grp-non-empty",
- useNewProtocol = useNewProtocol
+ useNewProtocol = useNewProtocol,
+ memberId = Uuid.randomUuid.toString
Review Comment:
It is a bit weird to have the member id here because the classic protocol
does not use it. I wonder if we should just hide this detail and generate the
member id within the joinConsumerGroup. What do you think?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -944,23 +941,20 @@ public boolean sameRequest(final OffsetFetchRequestState
request) {
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
OffsetFetchRequest.Builder builder;
- if (memberInfo.memberId.isPresent() &&
memberInfo.memberEpoch.isPresent()) {
- builder = new OffsetFetchRequest.Builder(
- groupId,
- memberInfo.memberId.get(),
- memberInfo.memberEpoch.get(),
- true,
- new ArrayList<>(this.requestedPartitions),
- throwOnFetchStableOffsetUnsupported);
- } else {
- // Building request without passing member ID/epoch to leave
the logic to choose
- // default values when not present on the request builder.
- builder = new OffsetFetchRequest.Builder(
- groupId,
- true,
- new ArrayList<>(this.requestedPartitions),
- throwOnFetchStableOffsetUnsupported);
- }
+ // Building request without passing member ID/epoch to leave the
logic to choose
+ // default values when not present on the request builder.
+ builder = memberInfo.memberEpoch.map(epoch -> new
OffsetFetchRequest.Builder(
+ groupId,
+ memberInfo.memberId,
+ epoch,
+ true,
+ new ArrayList<>(this.requestedPartitions),
+ throwOnFetchStableOffsetUnsupported))
+ .orElseGet(() -> new OffsetFetchRequest.Builder(
+ groupId,
+ true,
+ new ArrayList<>(this.requestedPartitions),
+ throwOnFetchStableOffsetUnsupported));
Review Comment:
For my understanding, this means that we only set the member id when we have
a valid epoch too. In other words, we don't set the member id if we are not
actively part of a group. Is my understanding correct?
I wonder if we should do the same for the groupMetadata in order to be
consistent. What do you think?
##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##########
@@ -39,7 +39,7 @@
{ "name": "ErrorMessage", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "The member id generated by the coordinator. Only provided when
the member joins with MemberEpoch == 0." },
+ "about": "The member id is generated by the consumer and provided by the
consumer for all requests." },
Review Comment:
I wonder if we should precise that in version 0, it is generated by the
group coordinator. However in version 1+, it is basically the value received
from the consumer?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1293,21 +1294,28 @@ private void throwIfNull(
* Validates the request.
*
* @param request The request to validate.
- *
+ * @param apiVersion The version of ConsumerGroupHeartbeat RPC
* @throws InvalidRequestException if the request is not valid.
* @throws UnsupportedAssignorException if the assignor is not supported.
*/
private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
- ConsumerGroupHeartbeatRequestData request
+ ConsumerGroupHeartbeatRequestData request,
+ short apiVersion
Review Comment:
nit: The indentation is incorrect. It should be 4 spaces.
##########
clients/src/main/resources/common/message/OffsetFetchRequest.json:
##########
@@ -54,7 +54,7 @@
{ "name": "GroupId", "type": "string", "versions": "8+", "entityType":
"groupId",
"about": "The group ID."},
{ "name": "MemberId", "type": "string", "versions": "9+",
"nullableVersions": "9+", "default": "null", "ignorable": true,
- "about": "The member ID assigned by the group coordinator if using the
new consumer protocol (KIP-848)." },
+ "about": "The member ID generated by the consumer if using the new
consumer protocol (KIP-848, KIP-1082)." },
Review Comment:
ditto.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10226,48 +10256,48 @@ public void
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
.build();
List<CoordinatorRecord> expectedRecords = Arrays.asList(
- // The existing classic group tombstone.
-
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),
-
- // Create the new consumer group with the static member.
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedClassicMember),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
0),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, expectedClassicMember.assignedPartitions()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
0),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedClassicMember),
-
- // Remove the static member because the rejoining member replaces
it.
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
+ // The existing classic group tombstone.
Review Comment:
Are they any changes in this block?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -281,13 +294,17 @@ public void testMemberIdGeneration() {
));
CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("group-foo")
- .setMemberEpoch(0)
- .setServerAssignor("range")
- .setRebalanceTimeoutMs(5000)
- .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
- .setTopicPartitions(Collections.emptyList()));
+ // The consumer generates its own Member ID starting from
version 1 of the ConsumerGroupHeartbeat RPC.
+ // Therefore, this test case is specific to earlier versions
of the RPC.
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("group-foo")
+ .setMemberEpoch(0)
+ .setServerAssignor("range")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()),
+ (short) (CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION - 1)
Review Comment:
nit: Let's use `0`. It is more explicit.
##########
core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala:
##########
@@ -242,50 +243,54 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
assignment = assignment(List(0, 1, 2))
)
- // The joining request with a consumer group member 2 is accepted.
- val memberId2 = consumerGroupHeartbeat(
- groupId = groupId,
- rebalanceTimeoutMs = 5 * 60 * 1000,
- subscribedTopicNames = List("foo"),
- topicPartitions = List.empty,
- expectedError = Errors.NONE
- ).memberId
-
- // The group has become a consumer group.
- assertEquals(
- List(
- new ListGroupsResponseData.ListedGroup()
- .setGroupId(groupId)
- .setProtocolType("consumer")
- .setGroupState(ConsumerGroupState.RECONCILING.toString)
- .setGroupType(Group.GroupType.CONSUMER.toString)
- ),
- listGroups(
- statesFilter = List.empty,
- typesFilter = List(Group.GroupType.CONSUMER.toString)
+ for (version <- ApiKeys.CONSUMER_GROUP_HEARTBEAT.oldestVersion() to
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)) {
Review Comment:
Testing all the versions in this suite does not seem necessary. We can focus
on testing the last one. However, let's ensure that we test all the versions in
ConsumerGroupHeartbeatRequestTest.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1335,23 +1342,22 @@ private void
throwIfConsumerGroupHeartbeatRequestIsInvalid(
* Validates the ShareGroupHeartbeat request.
*
* @param request The request to validate.
- *
* @throws InvalidRequestException if the request is not valid.
* @throws UnsupportedAssignorException if the assignor is not supported.
*/
private void throwIfShareGroupHeartbeatRequestIsInvalid(
ShareGroupHeartbeatRequestData request
) throws InvalidRequestException, UnsupportedAssignorException {
+ throwIfNull(request.memberId(), "MemberId can't be null.");
Review Comment:
ditto. I am not sure that it is necessary.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -281,13 +294,17 @@ public void testMemberIdGeneration() {
));
CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("group-foo")
- .setMemberEpoch(0)
- .setServerAssignor("range")
- .setRebalanceTimeoutMs(5000)
- .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
- .setTopicPartitions(Collections.emptyList()));
+ // The consumer generates its own Member ID starting from
version 1 of the ConsumerGroupHeartbeat RPC.
+ // Therefore, this test case is specific to earlier versions
of the RPC.
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("group-foo")
+ .setMemberEpoch(0)
+ .setServerAssignor("range")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()),
+ (short) (CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION - 1)
Review Comment:
nit: Indentation seems incorrect. There are other cases in this file. I let
you check them.
##########
core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala:
##########
@@ -36,6 +37,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
)
)
def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator():
Unit = {
+ // TODO fix
Review Comment:
Is it still relevant?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]