wernerdv commented on code in PR #20158:
URL: https://github.com/apache/kafka/pull/20158#discussion_r2716847779


##########
server/src/main/java/org/apache/kafka/server/FetchContext.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.FetchSession.CachedPartition;
+import org.apache.kafka.server.FetchSession.FetchSessionCache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+
+public sealed interface FetchContext {
+    /**
+     * Get the fetch offset for a given partition.
+     */
+    Optional<Long> getFetchOffset(TopicIdPartition part);
+
+    /**
+     * Apply a function to each partition in the fetch request.
+     */
+    void foreachPartition(BiConsumer<TopicIdPartition, 
FetchRequest.PartitionData> fun);
+
+    /**
+     * Get the response size to be used for quota computation. Since we are 
returning an empty response in case of
+     * throttling, we are not supposed to update the context until we know 
that we are not going to throttle.
+     */
+    int getResponseSize(LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> updates, short versionId);
+
+    /**
+     * Updates the fetch context with new partition information.  Generates 
response data.
+     * The response data may require subsequent down-conversion.
+     */
+    FetchResponse 
updateAndGenerateResponseData(LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> updates, List<Node> nodeEndpoints);
+
+    default String partitionsToLogString(Collection<TopicIdPartition> 
partitions, boolean isTraceEnabled) {
+        return FetchSession.partitionsToLogString(partitions, isTraceEnabled);
+    }
+
+    /**
+     * Return an empty throttled response due to quota violation.
+     */
+    default FetchResponse getThrottledResponse(int throttleTimeMs, List<Node> 
nodeEndpoints) {
+        return FetchResponse.of(Errors.NONE, throttleTimeMs, 
INVALID_SESSION_ID, new LinkedHashMap<>(), nodeEndpoints);
+    }
+
+    /**
+     * The fetch context for a fetch request that had a session error.
+     */
+    final class SessionErrorContext implements FetchContext {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(SessionErrorContext.class);
+
+        private final Errors error;
+
+        public SessionErrorContext(Errors error) {
+            this.error = error;
+        }
+
+        @Override
+        public Optional<Long> getFetchOffset(TopicIdPartition part) {
+            return Optional.empty();
+        }
+
+        @Override
+        public void foreachPartition(BiConsumer<TopicIdPartition, 
FetchRequest.PartitionData> fun) {
+        }
+
+        @Override
+        public int getResponseSize(LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> updates, short versionId) {
+            return FetchResponse.sizeOf(versionId, new Iterator<>() {
+                @Override
+                public boolean hasNext() {
+                    return false;
+                }
+
+                @Override
+                public Map.Entry<TopicIdPartition, 
FetchResponseData.PartitionData> next() {
+                    throw new NoSuchElementException();
+                }
+            });
+        }
+
+        /**
+         * Because of the fetch session error, we don't know what partitions 
were supposed to be in this request.
+         */
+        @Override
+        public FetchResponse 
updateAndGenerateResponseData(LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> updates,
+                                                           List<Node> 
nodeEndpoints) {
+            LOGGER.debug("Session error fetch context returning {}", error);
+            return FetchResponse.of(error, 0, INVALID_SESSION_ID, new 
LinkedHashMap<>(), nodeEndpoints);
+        }
+    }
+
+    /**
+     * The fetch context for a sessionless fetch request.
+     */
+    final class SessionlessFetchContext implements FetchContext {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(SessionlessFetchContext.class);
+
+        private final Map<TopicIdPartition, FetchRequest.PartitionData> 
fetchData;
+
+        /**
+         * @param fetchData The partition data from the fetch request.
+         */
+        public SessionlessFetchContext(Map<TopicIdPartition, 
FetchRequest.PartitionData> fetchData) {
+            this.fetchData = fetchData;
+        }
+
+        @Override
+        public Optional<Long> getFetchOffset(TopicIdPartition part) {
+            return Optional.ofNullable(fetchData.get(part)).map(data -> 
data.fetchOffset);
+        }
+
+        @Override
+        public void foreachPartition(BiConsumer<TopicIdPartition, 
FetchRequest.PartitionData> fun) {
+            fetchData.forEach(fun);
+        }
+
+        @Override
+        public int getResponseSize(LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> updates, short versionId) {
+            return FetchResponse.sizeOf(versionId, 
updates.entrySet().iterator());
+        }
+
+        @Override
+        public FetchResponse 
updateAndGenerateResponseData(LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> updates,
+                                                           List<Node> 
nodeEndpoints) {
+            LOGGER.debug("Sessionless fetch context returning {}", 
partitionsToLogString(updates.keySet(), LOGGER.isTraceEnabled()));
+            return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
updates, nodeEndpoints);
+        }
+    }
+
+    /**
+     * The fetch context for a full fetch request.
+     */
+    final class FullFetchContext implements FetchContext {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(FullFetchContext.class);
+
+        private final Time time;
+        private final FetchSessionCache cache;
+        private final Map<TopicIdPartition, FetchRequest.PartitionData> 
fetchData;
+        private final boolean usesTopicIds;
+        private final boolean isFromFollower;
+
+        public FullFetchContext(Time time,
+                                FetchSessionCacheShard cacheShard,
+                                Map<TopicIdPartition, 
FetchRequest.PartitionData> fetchData,
+                                boolean usesTopicIds,
+                                boolean isFromFollower) {
+            this(time, new FetchSessionCache(List.of(cacheShard)), fetchData, 
usesTopicIds, isFromFollower);
+        }
+
+        /**
+         * @param time           The clock to use
+         * @param cache          The fetch session cache
+         * @param fetchData      The partition data from the fetch request
+         * @param usesTopicIds   True if this session should use topic IDs
+         * @param isFromFollower True if this fetch request came from a 
follower
+         */
+        public FullFetchContext(Time time,
+                                FetchSessionCache cache,
+                                Map<TopicIdPartition, 
FetchRequest.PartitionData> fetchData,
+                                boolean usesTopicIds,
+                                boolean isFromFollower) {
+            this.time = time;
+            this.cache = cache;
+            this.fetchData = fetchData;
+            this.usesTopicIds = usesTopicIds;
+            this.isFromFollower = isFromFollower;
+        }
+
+        @Override
+        public Optional<Long> getFetchOffset(TopicIdPartition part) {
+            return Optional.ofNullable(fetchData.get(part)).map(data -> 
data.fetchOffset);
+        }
+
+        @Override
+        public void foreachPartition(BiConsumer<TopicIdPartition, 
FetchRequest.PartitionData> fun) {
+            fetchData.forEach(fun);
+        }
+
+        @Override
+        public int getResponseSize(LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> updates, short versionId) {
+            return FetchResponse.sizeOf(versionId, 
updates.entrySet().iterator());
+        }
+
+        @Override
+        public FetchResponse 
updateAndGenerateResponseData(LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> updates,
+                                                           List<Node> 
nodeEndpoints) {
+            FetchSessionCacheShard cacheShard = cache.getNextCacheShard();
+            int responseSessionId = 
cacheShard.maybeCreateSession(time.milliseconds(), isFromFollower,
+                updates.size(), usesTopicIds, () -> createNewSession(updates));
+            LOGGER.debug("Full fetch context with session id {} returning {}",
+                responseSessionId, partitionsToLogString(updates.keySet(), 
LOGGER.isTraceEnabled()));
+
+            return FetchResponse.of(Errors.NONE, 0, responseSessionId, 
updates, nodeEndpoints);
+        }
+
+        private ImplicitLinkedHashCollection<CachedPartition> createNewSession(
+                LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> updates
+        ) {
+            ImplicitLinkedHashCollection<CachedPartition> cachedPartitions = 
new ImplicitLinkedHashCollection<>(updates.size());
+            updates.forEach((part, respData) -> {
+                FetchRequest.PartitionData reqData = fetchData.get(part);
+                cachedPartitions.mustAdd(new CachedPartition(part, reqData, 
respData));
+            });
+
+            return cachedPartitions;
+        }
+    }
+
+    /**
+     * The fetch context for an incremental fetch request.
+     */
+    final class IncrementalFetchContext implements FetchContext {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(IncrementalFetchContext.class);
+
+        private final FetchMetadata reqMetadata;
+        private final FetchSession session;
+        private final Map<Uuid, String> topicNames;
+
+        /**
+         * @param reqMetadata  The request metadata
+         * @param session      The incremental fetch request session
+         * @param topicNames   A mapping from topic ID to topic name used to 
resolve partitions already in the session
+         */
+        public IncrementalFetchContext(FetchMetadata reqMetadata,
+                                       FetchSession session,
+                                       Map<Uuid, String> topicNames) {
+            this.reqMetadata = reqMetadata;
+            this.session = session;
+            this.topicNames = topicNames;
+        }
+
+        @Override
+        public Optional<Long> getFetchOffset(TopicIdPartition part) {
+            return session.getFetchOffset(part);
+        }
+
+        @Override
+        public void foreachPartition(BiConsumer<TopicIdPartition, 
FetchRequest.PartitionData> fun) {
+            // Take the session lock and iterate over all the cached 
partitions.
+            synchronized (session) {
+                session.partitionMap().forEach(part -> {
+                    // Try to resolve an unresolved partition if it does not 
yet have a name
+                    if (session.usesTopicIds())
+                        part.maybeResolveUnknownName(topicNames);
+                    fun.accept(new TopicIdPartition(part.topicId(), new 
TopicPartition(part.topic(), part.partition())), part.reqData());
+                });
+            }
+        }
+
+        /**
+         * Iterator that goes over the given partition map and selects 
partitions that need to be included in the response.
+         * If updateFetchContextAndRemoveUnselected is set to true, the fetch 
context will be updated for the selected
+         * partitions and also remove unselected ones as they are encountered.
+         */
+        private class PartitionIterator implements 
Iterator<Map.Entry<TopicIdPartition, FetchResponseData.PartitionData>> {
+            private final Iterator<Map.Entry<TopicIdPartition, 
FetchResponseData.PartitionData>> iter;
+            private final boolean updateFetchContextAndRemoveUnselected;
+            private Map.Entry<TopicIdPartition, 
FetchResponseData.PartitionData> nextElement;
+
+            public PartitionIterator(Iterator<Map.Entry<TopicIdPartition, 
FetchResponseData.PartitionData>> iter,
+                                     boolean 
updateFetchContextAndRemoveUnselected) throws NoSuchElementException {
+                this.iter = iter;
+                this.updateFetchContextAndRemoveUnselected = 
updateFetchContextAndRemoveUnselected;
+            }
+
+            @Override
+            public boolean hasNext() {
+                while ((nextElement == null) && iter.hasNext()) {
+                    Map.Entry<TopicIdPartition, 
FetchResponseData.PartitionData> element = iter.next();
+                    TopicIdPartition topicPart = element.getKey();
+                    FetchResponseData.PartitionData respData = 
element.getValue();
+                    CachedPartition cachedPart = 
session.partitionMap().find(new CachedPartition(topicPart));
+                    assert cachedPart != null;

Review Comment:
   Without this check, IDEA warns: `Method invocation 'maybeUpdateResponseData' 
may produce 'NullPointerException'`.
   
   I fixed it to: 
   `boolean mustRespond = cachedPart != null && 
cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to