adixitconfluent commented on code in PR #16263:
URL: https://github.com/apache/kafka/pull/16263#discussion_r1637853613


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -191,6 +208,97 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
         return future;
     }
 
+    public ShareFetchContext newContext(String groupId, Map<TopicIdPartition,
+            ShareFetchRequest.SharePartitionData> shareFetchData, 
List<TopicIdPartition> toForget, ShareFetchMetadata reqMetadata) {
+        ShareFetchContext context;
+        // TopicPartition with maxBytes as 0 should not be added in the 
cachedPartitions
+        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> 
shareFetchDataWithMaxBytes = new HashMap<>();
+        shareFetchData.forEach((tp, sharePartitionData) -> {
+            if (sharePartitionData.maxBytes > 0) 
shareFetchDataWithMaxBytes.put(tp, sharePartitionData);
+        });
+        // If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should 
remove the existing sessions. Also, start a
+        // new session in case it is INITIAL_EPOCH. Hence, we need to treat 
them as special cases.
+        if (reqMetadata.isFull()) {
+            ShareSessionKey key = shareSessionKey(groupId, 
reqMetadata.memberId());
+            if (reqMetadata.epoch() == ShareFetchMetadata.FINAL_EPOCH) {
+                // If the epoch is FINAL_EPOCH, don't try to create a new 
session.
+                if (!shareFetchDataWithMaxBytes.isEmpty()) {
+                    throw Errors.INVALID_REQUEST.exception();
+                }
+                context = new FinalContext();
+                synchronized (cache) {
+                    if (cache.remove(key) != null) {
+                        log.debug("Removed share session with key {}", key);
+                    }
+                }
+            } else {
+                if (cache.remove(key) != null) {
+                    log.debug("Removed share session with key {}", key);
+                }
+                ImplicitLinkedHashCollection<CachedSharePartition> 
cachedSharePartitions = new
+                        
ImplicitLinkedHashCollection<>(shareFetchDataWithMaxBytes.size());
+                shareFetchDataWithMaxBytes.forEach((topicIdPartition, reqData) 
->
+                    cachedSharePartitions.mustAdd(new 
CachedSharePartition(topicIdPartition, reqData, false)));
+                ShareSessionKey responseShareSessionKey = 
cache.maybeCreateSession(groupId, reqMetadata.memberId(),

Review Comment:
   thanks for pointing it out. I have added the handling for null case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to