[GitHub] [kafka] clolov commented on a diff in pull request #13538: KAFKA-14462; [8/N] Add ConsumerGroupMember
clolov commented on code in PR #13538: URL: https://github.com/apache/kafka/pull/13538#discussion_r1175040697 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * ConsumerGroupMember contains all the information related to a member + * within a consumer group. This class is immutable and is fully backed + * by records stored in the __consumer_offsets topic. + */ +public class ConsumerGroupMember { +/** + * A builder allowing to create a new member or update an + * existing one. + * + * Please refer to the javadoc of {{@link ConsumerGroupMember}} for the + * definition of the fields. + */ +public static class Builder { +private final String memberId; +private int memberEpoch = 0; +private int previousMemberEpoch = -1; +private int nextMemberEpoch = 0; +private String instanceId = null; +private String rackId = null; +private int rebalanceTimeoutMs = -1; +private String clientId = ""; +private String clientHost = ""; +private List subscribedTopicNames = Collections.emptyList(); +private String subscribedTopicRegex = ""; +private String serverAssignorName = null; +private List clientAssignors = Collections.emptyList(); +private Map> assignedPartitions = Collections.emptyMap(); +private Map> partitionsPendingRevocation = Collections.emptyMap(); +private Map> partitionsPendingAssignment = Collections.emptyMap(); + +public Builder(String memberId) { +this.memberId = Objects.requireNonNull(memberId); +} + +public Builder(ConsumerGroupMember member) { +Objects.requireNonNull(member); + +this.memberId = member.memberId; +this.memberEpoch = member.memberEpoch; +this.previousMemberEpoch = member.previousMemberEpoch; +this.nextMemberEpoch = member.nextMemberEpoch; +this.instanceId = member.instanceId; +this.rackId = member.rackId; +this.rebalanceTimeoutMs = member.rebalanceTimeoutMs; +this.clientId = member.clientId; +this.clientHost = member.clientHost; +this.subscribedTopicNames = member.subscribedTopicNames; +this.subscribedTopicRegex = member.subscribedTopicRegex; +this.serverAssignorName = member.serverAssignorName; +this.clientAssignors = member.clientAssignors; +this.assignedPartitions = member.assignedPartitions; +this.partitionsPendingRevocation = member.partitionsPendingRevocation; +this.partitionsPendingAssignment = member.partitionsPendingAssignment; +} + +public Builder setMemberEpoch(int memberEpoch) { +this.memberEpoch = memberEpoch; +return this; +} + +public Builder setPreviousMemberEpoch(int previousMemberEpoch) { +this.previousMemberEpoch = previousMemberEpoch; +return this; +} + +public Builder setNextMemberEpoch(int nextMemberEpoch) { +this.nextMemberEpoch = nextMemberEpoch; +return this; +} + +public Builder setInstanceId(String instanceId) { +this.instanceId = instanceId; +return this; +} + +public Builder maybeUpdateInstanceId(Optional instanceId) { +this.instanceId = instanceId.orElse(this.instanceId); +return this; +} + +public
[GitHub] [kafka] clolov commented on a diff in pull request #13538: KAFKA-14462; [8/N] Add ConsumerGroupMember
clolov commented on code in PR #13538: URL: https://github.com/apache/kafka/pull/13538#discussion_r1165400494 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * ConsumerGroupMember contains all the information related to a member + * within a consumer group. This class is immutable and is fully backed + * by records stored in the __consumer_offsets topic. + */ +public class ConsumerGroupMember { +/** + * A builder allowing to create a new member or update an + * existing one. + * + * Please refer to the javadoc of {{@link ConsumerGroupMember}} for the + * definition of the fields. + */ +public static class Builder { +private final String memberId; +private int memberEpoch = 0; +private int previousMemberEpoch = -1; +private int nextMemberEpoch = 0; +private String instanceId = null; +private String rackId = null; +private int rebalanceTimeoutMs = -1; +private String clientId = ""; +private String clientHost = ""; +private List subscribedTopicNames = Collections.emptyList(); +private String subscribedTopicRegex = ""; +private String serverAssignorName = null; +private List clientAssignors = Collections.emptyList(); +private Map> assignedPartitions = Collections.emptyMap(); +private Map> partitionsPendingRevocation = Collections.emptyMap(); +private Map> partitionsPendingAssignment = Collections.emptyMap(); + +public Builder(String memberId) { +this.memberId = Objects.requireNonNull(memberId); +} + +public Builder(ConsumerGroupMember member) { +Objects.requireNonNull(member); + +this.memberId = member.memberId; +this.memberEpoch = member.memberEpoch; +this.previousMemberEpoch = member.previousMemberEpoch; +this.nextMemberEpoch = member.nextMemberEpoch; +this.instanceId = member.instanceId; +this.rackId = member.rackId; +this.rebalanceTimeoutMs = member.rebalanceTimeoutMs; +this.clientId = member.clientId; +this.clientHost = member.clientHost; +this.subscribedTopicNames = member.subscribedTopicNames; +this.subscribedTopicRegex = member.subscribedTopicRegex; +this.serverAssignorName = member.serverAssignorName; +this.clientAssignors = member.clientAssignors; +this.assignedPartitions = member.assignedPartitions; +this.partitionsPendingRevocation = member.partitionsPendingRevocation; +this.partitionsPendingAssignment = member.partitionsPendingAssignment; +} + +public Builder setMemberEpoch(int memberEpoch) { +this.memberEpoch = memberEpoch; +return this; +} + +public Builder setPreviousMemberEpoch(int previousMemberEpoch) { +this.previousMemberEpoch = previousMemberEpoch; +return this; +} + +public Builder setNextMemberEpoch(int nextMemberEpoch) { +this.nextMemberEpoch = nextMemberEpoch; +return this; +} + +public Builder setInstanceId(String instanceId) { +this.instanceId = instanceId; +return this; +} + +public Builder maybeUpdateInstanceId(Optional instanceId) { +this.instanceId = instanceId.orElse(this.instanceId); +return this; +} + +public