junrao commented on code in PR #17011:
URL: https://github.com/apache/kafka/pull/17011#discussion_r1755910251


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -17,58 +17,565 @@
 
 package org.apache.kafka.coordinator.share;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
 import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
 import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
-import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilder;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetricsShard;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.config.ShareCoordinatorConfig;
+import org.apache.kafka.server.group.share.PartitionFactory;
+import org.apache.kafka.server.group.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
-/**
- * The share coordinator shard is a replicated state machine that manages the 
metadata of all
- * share partitions. It holds the hard and the soft state of the share 
partitions. This class
- * has two kinds of methods:
- * 1) The request handlers which handle the requests and generate a response 
and records to
- *    mutate the hard state. Those records will be written by the runtime and 
applied to the
- *    hard state via the replay methods.
- * 2) The replay methods which apply records to the hard state. Those are used 
in the request
- *    handling as well as during the initial loading of the records from the 
partitions.
- */
 public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord> {
+    private final Logger log;
+    private final ShareCoordinatorConfig config;
+    private final CoordinatorMetrics coordinatorMetrics;
+    private final CoordinatorMetricsShard metricsShard;
+    private final TimelineHashMap<SharePartitionKey, ShareGroupOffset> 
shareStateMap;  // coord key -> ShareGroupOffset
+    // leaderEpochMap can be updated by writeState call
+    // or if a newer leader makes a readState call.
+    private final TimelineHashMap<SharePartitionKey, Integer> leaderEpochMap;
+    private final TimelineHashMap<SharePartitionKey, Integer> 
snapshotUpdateCount;
+    private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap;
+    private MetadataImage metadataImage;
+
+    public static final Exception NULL_TOPIC_ID = new Exception("The topic id 
cannot be null.");
+    public static final Exception NEGATIVE_PARTITION_ID = new Exception("The 
partition id cannot be a negative number.");
+
+    public static class Builder implements 
CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> {
+        private ShareCoordinatorConfig config;
+        private LogContext logContext;
+        private SnapshotRegistry snapshotRegistry;
+        private CoordinatorMetrics coordinatorMetrics;
+        private TopicPartition topicPartition;
+
+        public Builder(ShareCoordinatorConfig config) {
+            this.config = config;
+        }
+
+        @Override
+        public CoordinatorShardBuilder<ShareCoordinatorShard, 
CoordinatorRecord> withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        @Override
+        public CoordinatorShardBuilder<ShareCoordinatorShard, 
CoordinatorRecord> withLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        @Override
+        public CoordinatorShardBuilder<ShareCoordinatorShard, 
CoordinatorRecord> withTime(Time time) {
+            // method is required due to interface
+            return this;
+        }
+
+        @Override
+        public CoordinatorShardBuilder<ShareCoordinatorShard, 
CoordinatorRecord> withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+            // method is required due to interface
+            return this;
+        }
+
+        @Override
+        public CoordinatorShardBuilder<ShareCoordinatorShard, 
CoordinatorRecord> withCoordinatorMetrics(CoordinatorMetrics 
coordinatorMetrics) {
+            this.coordinatorMetrics = coordinatorMetrics;
+            return this;
+        }
+
+        @Override
+        public CoordinatorShardBuilder<ShareCoordinatorShard, 
CoordinatorRecord> withTopicPartition(TopicPartition topicPartition) {
+            this.topicPartition = topicPartition;
+            return this;
+        }
+
+        @Override
+        @SuppressWarnings("NPathComplexity")
+        public ShareCoordinatorShard build() {
+            if (logContext == null) logContext = new LogContext();
+            if (config == null)
+                throw new IllegalArgumentException("Config must be set.");
+            if (snapshotRegistry == null)
+                throw new IllegalArgumentException("SnapshotRegistry must be 
set.");
+            if (coordinatorMetrics == null || !(coordinatorMetrics instanceof 
ShareCoordinatorMetrics))
+                throw new IllegalArgumentException("CoordinatorMetrics must be 
set and be of type ShareCoordinatorMetrics.");
+            if (topicPartition == null)
+                throw new IllegalArgumentException("TopicPartition must be 
set.");
+
+            ShareCoordinatorMetricsShard metricsShard = 
((ShareCoordinatorMetrics) coordinatorMetrics)
+                .newMetricsShard(snapshotRegistry, topicPartition);
+
+            return new ShareCoordinatorShard(
+                logContext,
+                config,
+                coordinatorMetrics,
+                metricsShard,
+                snapshotRegistry
+            );
+        }
+    }
+
+    ShareCoordinatorShard(
+        LogContext logContext,
+        ShareCoordinatorConfig config,
+        CoordinatorMetrics coordinatorMetrics,
+        CoordinatorMetricsShard metricsShard,
+        SnapshotRegistry snapshotRegistry
+    ) {
+        this.log = logContext.logger(ShareCoordinatorShard.class);
+        this.config = config;
+        this.coordinatorMetrics = coordinatorMetrics;
+        this.metricsShard = metricsShard;
+        this.shareStateMap = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.leaderEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
     @Override
     public void onLoaded(MetadataImage newImage) {
-        CoordinatorShard.super.onLoaded(newImage);
+        this.metadataImage = newImage;
+        coordinatorMetrics.activateMetricsShard(metricsShard);
     }
 
     @Override
     public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
-        CoordinatorShard.super.onNewMetadataImage(newImage, delta);
+        this.metadataImage = newImage;
     }
 
     @Override
     public void onUnloaded() {
-        CoordinatorShard.super.onUnloaded();
+        coordinatorMetrics.deactivateMetricsShard(metricsShard);
     }
 
     @Override
     public void replay(long offset, long producerId, short producerEpoch, 
CoordinatorRecord record) throws RuntimeException {
+        ApiMessageAndVersion key = record.key();
+        ApiMessageAndVersion value = record.value();
+
+        switch (key.version()) {
+            case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: // 
ShareSnapshot
+                handleShareSnapshot((ShareSnapshotKey) key.message(), 
(ShareSnapshotValue) messageOrNull(value));
+                break;
+            case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: // 
ShareUpdate
+                handleShareUpdate((ShareUpdateKey) key.message(), 
(ShareUpdateValue) messageOrNull(value));
+                break;
+            default:
+                // noop
+        }
+    }
+
+    private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue 
value) {
+        SharePartitionKey mapKey = 
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
+        maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+        maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
+
+        ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+        // this record is the complete snapshot
+        shareStateMap.put(mapKey, offsetRecord);
+        // if number of share updates is exceeded, then reset it
+        if (snapshotUpdateCount.containsKey(mapKey)) {
+            if (snapshotUpdateCount.get(mapKey) >= 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
+                snapshotUpdateCount.put(mapKey, 0);
+            }
+        }
+    }
+
+    private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value) 
{
+        SharePartitionKey mapKey = 
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
+        maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+
+        // share update does not hold state epoch information.
+
+        ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+        // this is an incremental snapshot
+        // so, we need to apply it to our current soft state
+        shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord : 
merge(v, value));
+        snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1);
+    }
+
+    private void maybeUpdateLeaderEpochMap(SharePartitionKey mapKey, int 
leaderEpoch) {
+        leaderEpochMap.putIfAbsent(mapKey, leaderEpoch);
+        if (leaderEpochMap.get(mapKey) < leaderEpoch) {
+            leaderEpochMap.put(mapKey, leaderEpoch);
+        }
+    }
+
+    private void maybeUpdateStateEpochMap(SharePartitionKey mapKey, int 
stateEpoch) {
+        stateEpochMap.putIfAbsent(mapKey, stateEpoch);
+        if (stateEpochMap.get(mapKey) < stateEpoch) {
+            stateEpochMap.put(mapKey, stateEpoch);
+        }
     }
 
     @Override
     public void replayEndTransactionMarker(long producerId, short 
producerEpoch, TransactionResult result) throws RuntimeException {
         CoordinatorShard.super.replayEndTransactionMarker(producerId, 
producerEpoch, result);
     }
 
-    public CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> writeState(RequestContext context, 
WriteShareGroupStateRequestData request) {
-        throw new RuntimeException("Not implemented");
+    /**
+     * This method generates the ShareSnapshotValue record corresponding to 
the requested topic partition information.
+     * The generated record is then written to the __share_group_state topic 
and replayed to the in-memory state
+     * of the coordinator shard, shareStateMap, by CoordinatorRuntime.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only a single key i.e. 
group1:topic1:partition1. The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - WriteShareGroupStateRequestData for a single key
+     * @return CoordinatorResult(records, response)
+     */
+    @SuppressWarnings("NPathComplexity")
+    public CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> writeState(
+        WriteShareGroupStateRequestData request
+    ) {
+        // records to write (with both key and value of snapshot type), 
response to caller
+        // only one key will be there in the request by design
+        
metricsShard.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+        Optional<CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord>> error = maybeGetWriteStateError(request);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        String groupId = request.groupId();
+        WriteShareGroupStateRequestData.WriteStateData topicData = 
request.topics().get(0);
+        WriteShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+
+        SharePartitionKey key = SharePartitionKey.getInstance(groupId, 
topicData.topicId(), partitionData.partition());
+        List<CoordinatorRecord> recordList;
+
+        if (!shareStateMap.containsKey(key)) {
+            // since this is the first time we are getting a write request, we 
should be creating a share snapshot record
+            recordList = 
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+                groupId, topicData.topicId(), partitionData.partition(), 
ShareGroupOffset.fromRequest(partitionData)
+            ));
+        } else if (snapshotUpdateCount.getOrDefault(key, 0) >= 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
+            int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? 
shareStateMap.get(key).leaderEpoch() : partitionData.leaderEpoch();
+            int newStateEpoch = partitionData.stateEpoch() == -1 ? 
shareStateMap.get(key).stateEpoch() : partitionData.stateEpoch();
+            long newStartOffset = partitionData.startOffset() == -1 ? 
shareStateMap.get(key).startOffset() : partitionData.startOffset();
+
+            // Since the number of update records for this share part key 
exceeds snapshotUpdateRecordsPerSnapshot,
+            // we should be creating a share snapshot record.
+            List<PersisterOffsetsStateBatch> batchesToAdd = 
combineStateBatches(
+                shareStateMap.get(key).stateBatchAsSet(),
+                partitionData.stateBatches().stream()
+                    .map(PersisterOffsetsStateBatch::from)
+                    .collect(Collectors.toCollection(LinkedHashSet::new)),
+                newStartOffset);
+
+            recordList = 
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+                groupId, topicData.topicId(), partitionData.partition(),
+                new ShareGroupOffset.Builder()
+                    .setStartOffset(newStartOffset)
+                    .setLeaderEpoch(newLeaderEpoch)
+                    .setStateEpoch(newStateEpoch)
+                    .setStateBatches(batchesToAdd)
+                    .build()));
+        } else {
+            // share snapshot is present and number of share snapshot update 
records < snapshotUpdateRecordsPerSnapshot
+            recordList = 
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+                groupId, topicData.topicId(), partitionData.partition(), 
ShareGroupOffset.fromRequest(partitionData, 
shareStateMap.get(key).snapshotEpoch())
+            ));
+        }
+
+        List<CoordinatorRecord> validRecords = new LinkedList<>();
+
+        WriteShareGroupStateResponseData responseData = new 
WriteShareGroupStateResponseData();
+        for (CoordinatorRecord record : recordList) {  // should be single 
record
+            if (!(record.key().message() instanceof ShareSnapshotKey) && 
!(record.key().message() instanceof ShareUpdateKey)) {
+                continue;
+            }
+            SharePartitionKey mapKey = null;
+            boolean shouldIncSnapshotEpoch = false;
+            if (record.key().message() instanceof ShareSnapshotKey) {
+                ShareSnapshotKey recordKey = (ShareSnapshotKey) 
record.key().message();
+                
responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult(
+                    recordKey.topicId(), 
Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult(
+                        recordKey.partition())))));
+                mapKey = SharePartitionKey.getInstance(recordKey.groupId(), 
recordKey.topicId(), recordKey.partition());
+                shouldIncSnapshotEpoch = true;
+            } else if (record.key().message() instanceof ShareUpdateKey) {
+                ShareUpdateKey recordKey = (ShareUpdateKey) 
record.key().message();
+                
responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult(
+                    recordKey.topicId(), 
Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult(
+                        recordKey.partition())))));
+                mapKey = SharePartitionKey.getInstance(recordKey.groupId(), 
recordKey.topicId(), recordKey.partition());
+            }
+
+            if (shareStateMap.containsKey(mapKey) && shouldIncSnapshotEpoch) {
+                ShareGroupOffset oldValue = shareStateMap.get(mapKey);
+                ((ShareSnapshotValue) 
record.value().message()).setSnapshotEpoch(oldValue.snapshotEpoch() + 1);  // 
increment the snapshot epoch
+            }
+            validRecords.add(record); // this will have updated snapshot epoch 
and on replay the value will trickle down to the map
+        }
+
+        return new CoordinatorResult<>(validRecords, responseData);
     }
 
+    /**
+     * This method finds the ShareSnapshotValue record corresponding to the 
requested topic partition from the
+     * in-memory state of coordinator shard, the shareStateMap.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - WriteShareGroupStateRequestData for a single key
+     * @param offset  - offset to read from the __share_group_state topic 
partition
+     * @return CoordinatorResult(records, response)
+     */
     public ReadShareGroupStateResponseData 
readState(ReadShareGroupStateRequestData request, Long offset) {
-        throw new RuntimeException("Not implemented");
+        // records to read (with the key of snapshot type), response to caller
+        // only one key will be there in the request by design
+        Optional<ReadShareGroupStateResponseData> error = 
maybeGetReadStateError(request, offset);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        Uuid topicId = request.topics().get(0).topicId();
+        int partition = 
request.topics().get(0).partitions().get(0).partition();
+        int leaderEpoch = 
request.topics().get(0).partitions().get(0).leaderEpoch();
+
+        SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, partition);
+
+        if (!shareStateMap.containsKey(coordinatorKey)) {
+            return ReadShareGroupStateResponse.toResponseData(
+                topicId,
+                partition,
+                PartitionFactory.DEFAULT_START_OFFSET,
+                PartitionFactory.DEFAULT_STATE_EPOCH,
+                Collections.emptyList()
+            );
+        }
+
+        ShareGroupOffset offsetValue = shareStateMap.get(coordinatorKey, 
offset);
+
+        if (offsetValue == null) {
+            // Returning an error response as the snapshot value was not found
+            return ReadShareGroupStateResponse.toErrorResponseData(
+                topicId,
+                partition,
+                Errors.UNKNOWN_SERVER_ERROR,
+                "Data not found for topic {}, partition {} for group {}, in 
the in-memory state of share coordinator"
+            );
+        }
+
+        List<ReadShareGroupStateResponseData.StateBatch> stateBatches = 
(offsetValue.stateBatches() != null && !offsetValue.stateBatches().isEmpty()) ?
+            offsetValue.stateBatches().stream().map(
+                stateBatch -> new ReadShareGroupStateResponseData.StateBatch()
+                    .setFirstOffset(stateBatch.firstOffset())
+                    .setLastOffset(stateBatch.lastOffset())
+                    .setDeliveryState(stateBatch.deliveryState())
+                    .setDeliveryCount(stateBatch.deliveryCount())
+            ).collect(java.util.stream.Collectors.toList()) : 
Collections.emptyList();
+
+        // Updating the leader map with the new leader epoch
+        leaderEpochMap.put(coordinatorKey, leaderEpoch);
+
+        // Returning the successfully retrieved snapshot value
+        return ReadShareGroupStateResponse.toResponseData(topicId, partition, 
offsetValue.startOffset(), offsetValue.stateEpoch(), stateBatches);
+    }
+
+    private Optional<CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord>> maybeGetWriteStateError(
+        WriteShareGroupStateRequestData request
+    ) {
+        String groupId = request.groupId();
+        WriteShareGroupStateRequestData.WriteStateData topicData = 
request.topics().get(0);
+        WriteShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+
+        Uuid topicId = topicData.topicId();
+        int partitionId = partitionData.partition();
+
+        if (topicId == null) {
+            return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST, 
NULL_TOPIC_ID, null, partitionId));
+        }
+
+        if (partitionId < 0) {
+            return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST, 
NEGATIVE_PARTITION_ID, topicId, partitionId));
+        }
+
+        SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, 
topicId, partitionId);
+        if (leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > 
partitionData.leaderEpoch()) {
+            log.error("Request leader epoch smaller than last recorded.");
+            return 
Optional.of(getWriteErrorResponse(Errors.FENCED_LEADER_EPOCH, null, topicId, 
partitionId));
+        }
+        if (stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > 
partitionData.stateEpoch()) {
+            log.error("Request state epoch smaller than last recorded.");
+            return 
Optional.of(getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId, 
partitionId));
+        }
+        if (metadataImage == null) {
+            log.error("Metadata image is null");
+            return 
Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, 
topicId, partitionId));
+        }
+        if (metadataImage.topics().getTopic(topicId) == null ||
+            metadataImage.topics().getPartition(topicId, partitionId) == null) 
{
+            log.error("Topic/TopicPartition not found in metadata image.");
+            return 
Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, 
topicId, partitionId));
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<ReadShareGroupStateResponseData> 
maybeGetReadStateError(ReadShareGroupStateRequestData request, Long offset) {
+        String groupId = request.groupId();
+        ReadShareGroupStateRequestData.ReadStateData topicData = 
request.topics().get(0);
+        ReadShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+
+        Uuid topicId = topicData.topicId();
+        int partitionId = partitionData.partition();
+
+        if (topicId == null) {
+            log.error("Request topic id is null.");
+            return Optional.of(ReadShareGroupStateResponse.toErrorResponseData(
+                null, partitionId, Errors.INVALID_REQUEST, 
NULL_TOPIC_ID.getMessage()));
+        }
+
+        if (partitionId < 0) {
+            log.error("Request partition id is negative.");
+            return Optional.of(ReadShareGroupStateResponse.toErrorResponseData(
+                topicId, partitionId, Errors.INVALID_REQUEST, 
NEGATIVE_PARTITION_ID.getMessage()));
+        }
+
+        SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, 
topicId, partitionId);
+        if (leaderEpochMap.containsKey(mapKey, offset) && 
leaderEpochMap.get(mapKey, offset) > partitionData.leaderEpoch()) {
+            log.error("Request leader epoch id is smaller than last 
recorded.");
+            return 
Optional.of(ReadShareGroupStateResponse.toErrorResponseData(topicId, 
partitionId, Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH.message()));
+        }
+
+        if (metadataImage == null) {
+            log.error("Metadata image is null");
+            return 
Optional.of(ReadShareGroupStateResponse.toErrorResponseData(topicId, 
partitionId, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
+        }
+
+        if (metadataImage.topics().getTopic(topicId) == null ||
+            metadataImage.topics().getPartition(topicId, partitionId) == null) 
{
+            log.error("Topic/TopicPartition not found in metadata image.");
+            return 
Optional.of(ReadShareGroupStateResponse.toErrorResponseData(topicId, 
partitionId, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
+        }
+
+        return Optional.empty();
+    }
+
+    private CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> getWriteErrorResponse(
+        Errors error,
+        Exception exception,
+        Uuid topicId,
+        int partitionId
+    ) {
+        String message = exception == null ? error.message() : 
exception.getMessage();
+        WriteShareGroupStateResponseData responseData = 
WriteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error, 
message);
+        return new CoordinatorResult<>(Collections.emptyList(), responseData);
+    }
+
+    // Visible for testing
+    Integer getLeaderMapValue(SharePartitionKey key) {
+        return this.leaderEpochMap.get(key);
+    }
+
+    // Visible for testing
+    Integer getStateEpochMapValue(SharePartitionKey key) {
+        return this.stateEpochMap.get(key);
+    }
+
+    // Visible for testing
+    ShareGroupOffset getShareStateMapValue(SharePartitionKey key) {
+        return this.shareStateMap.get(key);
+    }
+
+    // Visible for testing
+    CoordinatorMetricsShard getMetricsShard() {
+        return metricsShard;
+    }
+
+    private static ShareGroupOffset merge(ShareGroupOffset soFar, 
ShareUpdateValue newData) {
+        // snapshot epoch should be same as last share snapshot
+        // state epoch is not present
+        Set<PersisterOffsetsStateBatch> currentBatches = 
soFar.stateBatchAsSet();
+        long newStartOffset = newData.startOffset() == -1 ? 
soFar.startOffset() : newData.startOffset();
+        int newLeaderEpoch = newData.leaderEpoch() == -1 ? soFar.leaderEpoch() 
: newData.leaderEpoch();
+
+        return new ShareGroupOffset.Builder()
+            .setSnapshotEpoch(soFar.snapshotEpoch())
+            .setStateEpoch(soFar.stateEpoch())
+            .setStartOffset(newStartOffset)
+            .setLeaderEpoch(newLeaderEpoch)
+            .setStateBatches(combineStateBatches(currentBatches, 
newData.stateBatches().stream()
+                .map(PersisterOffsetsStateBatch::from)
+                .collect(Collectors.toCollection(LinkedHashSet::new)), 
newStartOffset))
+            .build();
+    }
+
+    /**
+     * Util method which takes in 2 collections containing {@link 
PersisterOffsetsStateBatch}
+     * and the startOffset.
+     * It removes all batches from the 1st collection which have the same 
first and last offset
+     * as the batches in 2nd collection. It then creates a final list of 
batches which contains the
+     * former result and all the batches in the 2nd collection. In set 
notation (A - B) U B (we prefer batches in B
+     * which have same first and last offset in A).
+     * Finally, it removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+     * @param currentBatch - collection containing current soft state of 
batches
+     * @param newBatch - collection containing batches in incoming request
+     * @param startOffset - startOffset to consider when removing old batches.
+     * @return List containing combined batches
+     */
+    private static List<PersisterOffsetsStateBatch> combineStateBatches(
+        Collection<PersisterOffsetsStateBatch> currentBatch,
+        Collection<PersisterOffsetsStateBatch> newBatch,
+        long startOffset
+    ) {
+        currentBatch.removeAll(newBatch);
+        List<PersisterOffsetsStateBatch> batchesToAdd = new 
LinkedList<>(currentBatch);
+        batchesToAdd.addAll(newBatch);

Review Comment:
   It would be useful to verify if this is an issue. Suppose currentBatch is
   batch1 {
   firstOffset: 100
   lastOffset: 109
   deliverState: Available
   deliverCount: 1
   }
   and newBatch is 
   batch2{
   firstOffset: 105
   lastOffset: 105
   deliverState: Acknowledge
   deliverCount: 1
   }
   
   After the call to combineStateBatches(), Share coordinator will have both 
batches in its state and thus the share leader could have both batches too 
(after initializing from ReadShareGroupState). Now suppose that the share 
leader fetches the following batch and calls SharePartition.acquire().
   
   fetchedBatch{
   firstOffset: 100
   lastOffset: 109
   }
   
   Both batch1 and batch2 will match the fetched batch. When calling 
acquireSubsetBatchRecords() on batch1, we will add the full batch to 
AcquiredRecords. When calling acquireSubsetBatchRecords() on batch2, we will 
skip since the only record in it has been acked. But AcquiredRecords is 
unchanged after this. This means that we will return the full batch as acquired 
records, which is incorrect since offset 105 shouldn't be acquired.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to