junrao commented on code in PR #17709: URL: https://github.com/apache/kafka/pull/17709#discussion_r1833506455
########## share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.fetch; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.server.storage.log.FetchParams; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * The ShareFetch class is used to store the fetch parameters for a share fetch request. + */ +public class ShareFetch { + + /** + * The future that will be completed when the fetch is done. + */ + private final CompletableFuture<Map<TopicIdPartition, PartitionData>> future; + + /** + * The fetch parameters for the fetch request. + */ + private final FetchParams fetchParams; + /** + * The group id of the share group that is fetching the records. + */ + private final String groupId; + /** + * The member id of the share group that is fetching the records. + */ + private final String memberId; + /** + * The maximum number of bytes that can be fetched for each partition. + */ + private final Map<TopicIdPartition, Integer> partitionMaxBytes; + /** + * The maximum number of records that can be fetched for the request. + */ + private final int maxFetchRecords; + /** + * The partitions that had an error during the fetch. + */ + private volatile Map<TopicIdPartition, Throwable> erroneous; + + public ShareFetch( + FetchParams fetchParams, + String groupId, + String memberId, + CompletableFuture<Map<TopicIdPartition, PartitionData>> future, + Map<TopicIdPartition, Integer> partitionMaxBytes, + int maxFetchRecords + ) { + this.fetchParams = fetchParams; + this.groupId = groupId; + this.memberId = memberId; + this.future = future; + this.partitionMaxBytes = partitionMaxBytes; + this.maxFetchRecords = maxFetchRecords; + } + + public String groupId() { + return groupId; + } + + public String memberId() { + return memberId; + } + + public Map<TopicIdPartition, Integer> partitionMaxBytes() { + return partitionMaxBytes; + } + + public FetchParams fetchParams() { + return fetchParams; + } + + public int maxFetchRecords() { + return maxFetchRecords; + } + + /** + * Add an erroneous partition to the share fetch request. If the erroneous map is null, it will + * be created. + * <p> + * The method is synchronized to avoid concurrent modification of the erroneous map, as for + * some partitions the pending initialization can be on some threads and for other partitions + * share fetch request can be processed in purgatory. + * + * @param topicIdPartition The partition that had an error. + * @param throwable The error that occurred. + */ + public synchronized void addErroneous(TopicIdPartition topicIdPartition, Throwable throwable) { + if (erroneous == null) { + erroneous = new HashMap<>(); + } + erroneous.put(topicIdPartition, throwable); + } + + /** + * Check if the share fetch request is completed. + * @return true if the request is completed, false otherwise. + */ + public boolean isCompleted() { + return future.isDone(); + } + + /** + * Check if all the partitions in the request have errored. + * @return true if all the partitions in the request have errored, false otherwise. + */ + public boolean isErrored() { + return erroneous != null && erroneous.size() == partitionMaxBytes().size(); + } + + /** + * May be complete the share fetch request with the given partition data. If the request is already completed, + * this method does nothing. If there are any erroneous partitions, they will be added to the response. + * + * @param partitionData The partition data to complete the fetch with. + */ + public void maybeComplete(Map<TopicIdPartition, PartitionData> partitionData) { + if (isCompleted()) { + return; + } + + Map<TopicIdPartition, PartitionData> response = new HashMap<>(partitionData); + // Add any erroneous partitions to the response. + addErroneousToResponse(response); + future.complete(response); + } + + /** + * May be complete the share fetch request with the given exception for the topicIdPartitions. Review Comment: May be => Maybe ########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -247,13 +248,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) - val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) - val shareFetchResponseData = shareFetchResponse.data() - assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) - assertEquals(1, shareFetchResponseData.responses().size()) - assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId()) - assertEquals(3, shareFetchResponseData.responses().get(0).partitions().size()) + // For the multi partition fetch request, the response may not be available in the first attempt + // as the share partitions might not be initialized yet. So, we retry until we get the response. + var responses = Seq[ShareFetchResponseData.PartitionData]() + TestUtils.waitUntilTrue(() => { + val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponseData = shareFetchResponse.data() + assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) + val partitionsCount = shareFetchResponseData.responses().get(0).partitions().size() + if (partitionsCount > 0) { + assertEquals(1, shareFetchResponseData.responses().size()) + assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId()) + shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => { + if (!partitionData.acquiredRecords().isEmpty) { + responses = responses :+ partitionData + } + }) + } + responses.size == 3 Review Comment: Should we reset responses during retry? Ditto below. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -296,15 +296,15 @@ private boolean isMinBytesSatisfied(Map<TopicIdPartition, FetchRequest.Partition } } } - return accumulatedSize >= shareFetchData.fetchParams().minBytes; + return accumulatedSize >= shareFetch.fetchParams().minBytes; } private LogOffsetMetadata endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) { Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); LogOffsetSnapshot offsetSnapshot = partition.fetchOffsetSnapshot(Optional.empty(), true); // The FetchIsolation type that we use for share fetch is FetchIsolation.HIGH_WATERMARK. In the future, we can // extend it to support other FetchIsolation types. - FetchIsolation isolationType = shareFetchData.fetchParams().isolation; + FetchIsolation isolationType = shareFetch.fetchParams().isolation; Review Comment: `replicaManager.getPartitionOrException` above throws an exception. Should we handle that and add it to shareFetch.erroneous? ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -658,25 +651,26 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio }); } - private void maybeCompleteInitializationWithException( + private void handleInitializationException( SharePartitionKey sharePartitionKey, - CompletableFuture<Map<TopicIdPartition, PartitionData>> future, + ShareFetch shareFetch, Throwable throwable) { if (throwable instanceof LeaderNotAvailableException) { log.debug("The share partition with key {} is not initialized yet", sharePartitionKey); - // Do not process the fetch request for this partition as the leader is not initialized yet. - // The fetch request will be retried in the next poll. - // TODO: Add the request to delayed fetch purgatory. + // Skip any handling for this error as the share partition is still loading. The request Review Comment: When do we get a LeaderNotAvailableException? My understanding is that the throwable is based on the error code from ReadShareGroupStateResponse and it doesn't seem to return LeaderNotAvailableException. ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -565,74 +562,70 @@ private static String partitionsToLogString(Collection<TopicIdPartition> partiti } // Visible for testing. - void processShareFetch(ShareFetchData shareFetchData) { - if (shareFetchData.partitionMaxBytes().isEmpty()) { + void processShareFetch(ShareFetch shareFetch) { + if (shareFetch.partitionMaxBytes().isEmpty()) { // If there are no partitions to fetch then complete the future with an empty map. - shareFetchData.future().complete(Collections.emptyMap()); + shareFetch.maybeComplete(Collections.emptyMap()); return; } - // Initialize lazily, if required. - Map<TopicIdPartition, Throwable> erroneous = null; Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>(); LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); - for (TopicIdPartition topicIdPartition : shareFetchData.partitionMaxBytes().keySet()) { + for (TopicIdPartition topicIdPartition : shareFetch.partitionMaxBytes().keySet()) { SharePartitionKey sharePartitionKey = sharePartitionKey( - shareFetchData.groupId(), + shareFetch.groupId(), topicIdPartition ); SharePartition sharePartition; try { sharePartition = getOrCreateSharePartition(sharePartitionKey); } catch (Exception e) { - // Complete the whole fetch request with an exception if there is an error processing. - // The exception currently can be thrown only if there is an error while initializing - // the share partition. But skip the processing for other share partitions in the request - // as this situation is not expected. - log.error("Error processing share fetch request", e); - if (erroneous == null) { - erroneous = new HashMap<>(); - } - erroneous.put(topicIdPartition, e); + log.debug("Error processing share fetch request", e); + shareFetch.addErroneous(topicIdPartition, e); // Continue iteration for other partitions in the request. continue; } // We add a key corresponding to each share partition in the request in the group so that when there are // acknowledgements/acquisition lock timeout etc., we have a way to perform checkAndComplete for all // such requests which are delayed because of lack of data to acquire for the share partition. - delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(shareFetch.groupId(), + topicIdPartition.topicId(), topicIdPartition.partition()); + delayedShareFetchWatchKeys.add(delayedShareFetchKey); // We add a key corresponding to each topic partition in the request so that when the HWM is updated // for any topic partition, we have a way to perform checkAndComplete for all such requests which are // delayed because of lack of data to acquire for the topic partition. delayedShareFetchWatchKeys.add(new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition())); - // The share partition is initialized asynchronously, so we need to wait for it to be initialized. - // But if the share partition is already initialized, then the future will be completed immediately. - // Hence, it's safe to call the maybeInitialize method and then wait for the future to be completed. - // TopicPartitionData list will be populated only if the share partition is already initialized. - sharePartition.maybeInitialize().whenComplete((result, throwable) -> { + + CompletableFuture<Void> initializationFuture = sharePartition.maybeInitialize(); + final boolean initialized = initializationFuture.isDone(); + initializationFuture.whenComplete((result, throwable) -> { if (throwable != null) { - // TODO: Complete error handling for initialization. We have to record the error - // for respective share partition as completing the full request might result in - // some acquired records to not being sent: https://issues.apache.org/jira/browse/KAFKA-17510 - maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable); + handleInitializationException(sharePartitionKey, shareFetch, throwable); Review Comment: Since we are triggering delayedShareFetch below, do we need to handle the error for shareFetch here? ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -198,7 +198,7 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() { Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>(); sharePartitions.forEach((topicIdPartition, sharePartition) -> { - int partitionMaxBytes = shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0); + int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0); Review Comment: Should we skip erroneous partitions in `shareFetch`? Also, when calling `sharePartition.maybeAcquireFetchLock()`, if the partition is in ERROR or FENCED state, should we add the partition to erroneous partitions in `shareFetch` too? ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -507,19 +506,18 @@ public void acknowledgeSessionUpdate(String groupId, ShareRequestMetadata reqMet * but as we cannot determine which share partition errored out, we might remove all the share partitions * in the request. * - * @param groupId The group id in the share fetch request. + * @param shareFetch The share fetch request. * @param topicIdPartitions The topic-partitions in the replica read request. - * @param future The future to complete with the exception. * @param throwable The exception that occurred while fetching messages. */ public void handleFetchException( - String groupId, + ShareFetch shareFetch, Review Comment: Could we move this method to `DelayedShareFetch` and make it private since it's only called there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org