omkreddy commented on code in PR #16263:
URL: https://github.com/apache/kafka/pull/16263#discussion_r1637678486


##########
core/src/main/java/kafka/server/share/FinalContext.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.share;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+
+/**
+ * The share fetch context for a final share fetch request.
+ */
+public class FinalContext extends ShareFetchContext {
+
+    private final static Logger log = 
LoggerFactory.getLogger(FinalContext.class);
+
+    public FinalContext() {
+    }
+
+    @Override
+    boolean isTraceEnabled() {
+        return log.isTraceEnabled();
+    }
+
+    @Override
+    int responseSize(LinkedHashMap<TopicIdPartition, 
ShareFetchResponseData.PartitionData> updates, short version) {
+        return ShareFetchResponse.sizeOf(version, 
updates.entrySet().iterator());
+    }
+
+    @Override
+    ShareFetchResponse updateAndGenerateResponseData(String groupId, Uuid 
memberId,
+                                                     
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
+        log.debug("Final context returning {}", 
partitionsToLogString(updates.keySet()));
+        return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0,
+                updates.entrySet().iterator(), Collections.emptyList()));
+    }
+
+    @Override
+    ErroneousAndValidPartitionData getErroneousAndValidTopicIdPartitions() {
+        return new ErroneousAndValidPartitionData();
+    }
+}

Review Comment:
   nit: missing new line



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -191,6 +208,97 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
         return future;
     }
 
+    public ShareFetchContext newContext(String groupId, Map<TopicIdPartition,

Review Comment:
   nit: missing javadoc



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -191,6 +208,97 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
         return future;
     }
 
+    public ShareFetchContext newContext(String groupId, Map<TopicIdPartition,
+            ShareFetchRequest.SharePartitionData> shareFetchData, 
List<TopicIdPartition> toForget, ShareFetchMetadata reqMetadata) {
+        ShareFetchContext context;
+        // TopicPartition with maxBytes as 0 should not be added in the 
cachedPartitions
+        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> 
shareFetchDataWithMaxBytes = new HashMap<>();
+        shareFetchData.forEach((tp, sharePartitionData) -> {
+            if (sharePartitionData.maxBytes > 0) 
shareFetchDataWithMaxBytes.put(tp, sharePartitionData);
+        });
+        // If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should 
remove the existing sessions. Also, start a
+        // new session in case it is INITIAL_EPOCH. Hence, we need to treat 
them as special cases.
+        if (reqMetadata.isFull()) {
+            ShareSessionKey key = shareSessionKey(groupId, 
reqMetadata.memberId());
+            if (reqMetadata.epoch() == ShareFetchMetadata.FINAL_EPOCH) {
+                // If the epoch is FINAL_EPOCH, don't try to create a new 
session.
+                if (!shareFetchDataWithMaxBytes.isEmpty()) {
+                    throw Errors.INVALID_REQUEST.exception();
+                }
+                context = new FinalContext();
+                synchronized (cache) {
+                    if (cache.remove(key) != null) {
+                        log.debug("Removed share session with key {}", key);
+                    }
+                }
+            } else {
+                if (cache.remove(key) != null) {
+                    log.debug("Removed share session with key {}", key);
+                }
+                ImplicitLinkedHashCollection<CachedSharePartition> 
cachedSharePartitions = new
+                        
ImplicitLinkedHashCollection<>(shareFetchDataWithMaxBytes.size());
+                shareFetchDataWithMaxBytes.forEach((topicIdPartition, reqData) 
->
+                    cachedSharePartitions.mustAdd(new 
CachedSharePartition(topicIdPartition, reqData, false)));
+                ShareSessionKey responseShareSessionKey = 
cache.maybeCreateSession(groupId, reqMetadata.memberId(),

Review Comment:
   what if responseShareSessionKey is null here?



##########
server/src/main/java/org/apache/kafka/server/share/ShareSession.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.requests.ShareFetchRequest;
+import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+
+public class ShareSession {
+
+    // Helper enum to return the possible type of modified list of 
TopicIdPartitions in cache
+    public enum ModifiedTopicIdPartitionType {
+        ADDED,
+        UPDATED,
+        REMOVED
+    }
+
+    private final ShareSessionKey key;
+    private final ImplicitLinkedHashCollection<CachedSharePartition> 
partitionMap;
+    private final long creationMs;
+
+    private long lastUsedMs;
+    // visible for testing
+    public int epoch;
+    // This is used by the ShareSessionCache to store the last known size of 
this session.
+    // If this is -1, the Session is not in the cache.
+    private int cachedSize = -1;
+
+    /**
+     * The share session.
+     * Each share session is protected by its own lock, which must be taken 
before mutable
+     * fields are read or modified.  This includes modification of the share 
session partition map.
+     *
+     * @param key                The share session key to identify the share 
session uniquely.
+     * @param partitionMap       The CachedPartitionMap.
+     * @param creationMs         The time in milliseconds when this share 
session was created.
+     * @param lastUsedMs         The last used time in milliseconds. This 
should only be updated by
+     *                           ShareSessionCache#touch.
+     * @param epoch              The share session sequence number.
+     */
+    public ShareSession(ShareSessionKey key, 
ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
+                        long creationMs, long lastUsedMs, int epoch) {
+        this.key = key;
+        this.partitionMap = partitionMap;
+        this.creationMs = creationMs;
+        this.lastUsedMs = lastUsedMs;
+        this.epoch = epoch;
+    }
+
+    public ShareSessionKey key() {
+        return key;
+    }
+
+    synchronized public int cachedSize() {
+        return cachedSize;
+    }
+
+    synchronized public void cachedSize(int size) {
+        cachedSize = size;
+    }
+
+    synchronized public long lastUsedMs() {
+        return lastUsedMs;
+    }
+
+    synchronized public void lastUsedMs(long ts) {
+        lastUsedMs = ts;
+    }
+
+    synchronized public ImplicitLinkedHashCollection<CachedSharePartition> 
partitionMap() {
+        return partitionMap;
+    }
+
+    // Visible for testing
+    synchronized public int epoch() {
+        return epoch;
+    }
+
+    synchronized public int size() {
+        return partitionMap.size();
+    }
+
+    synchronized public Boolean isEmpty() {
+        return partitionMap.isEmpty();
+    }
+
+    synchronized public LastUsedKey lastUsedKey() {
+        return new LastUsedKey(key, lastUsedMs);
+    }
+
+    // Visible for testing
+    synchronized public long creationMs() {
+        return creationMs;
+    }
+
+    // Update the cached partition data based on the request.
+    public Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> 
update(Map<TopicIdPartition,

Review Comment:
   Can we make this synchronized method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to