chia7712 commented on code in PR #17957: URL: https://github.com/apache/kafka/pull/17957#discussion_r1864856887
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -389,79 +389,84 @@ public CompletableFuture<Void> maybeInitialize() { .build()) .build() ).whenComplete((result, exception) -> { - if (exception != null) { - log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, exception); - completeInitializationWithException(future, exception); - return; - } + lock.writeLock().lock(); + try { + if (exception != null) { + log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, exception); + completeInitializationWithException(future, exception); + return; + } - if (result == null || result.topicsData() == null || result.topicsData().size() != 1) { - log.error("Failed to initialize the share partition: {}-{}. Invalid state found: {}.", - groupId, topicIdPartition, result); - completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))); - return; - } + if (result == null || result.topicsData() == null || result.topicsData().size() != 1) { + log.error("Failed to initialize the share partition: {}-{}. Invalid state found: {}.", + groupId, topicIdPartition, result); + completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))); + return; + } - TopicData<PartitionAllData> state = result.topicsData().get(0); - if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1) { - log.error("Failed to initialize the share partition: {}-{}. Invalid topic partition response: {}.", - groupId, topicIdPartition, result); - completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))); - return; - } + TopicData<PartitionAllData> state = result.topicsData().get(0); + if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1) { + log.error("Failed to initialize the share partition: {}-{}. Invalid topic partition response: {}.", + groupId, topicIdPartition, result); + completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))); + return; + } - PartitionAllData partitionData = state.partitions().get(0); - if (partitionData.partition() != topicIdPartition.partition()) { - log.error("Failed to initialize the share partition: {}-{}. Invalid partition response: {}.", - groupId, topicIdPartition, partitionData); - completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))); - return; - } + PartitionAllData partitionData = state.partitions().get(0); + if (partitionData.partition() != topicIdPartition.partition()) { + log.error("Failed to initialize the share partition: {}-{}. Invalid partition response: {}.", + groupId, topicIdPartition, partitionData); + completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))); + return; + } - if (partitionData.errorCode() != Errors.NONE.code()) { - KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage()); - log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.", - groupId, topicIdPartition, partitionData); - completeInitializationWithException(future, ex); - return; - } + if (partitionData.errorCode() != Errors.NONE.code()) { + KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage()); + log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.", + groupId, topicIdPartition, partitionData); + completeInitializationWithException(future, ex); + return; + } - try { - startOffset = startOffsetDuringInitialization(partitionData.startOffset()); - } catch (Exception e) { - completeInitializationWithException(future, e); - return; - } - stateEpoch = partitionData.stateEpoch(); - - List<PersisterStateBatch> stateBatches = partitionData.stateBatches(); - for (PersisterStateBatch stateBatch : stateBatches) { - if (stateBatch.firstOffset() < startOffset) { - log.error("Invalid state batch found for the share partition: {}-{}. The base offset: {}" - + " is less than the start offset: {}.", groupId, topicIdPartition, - stateBatch.firstOffset(), startOffset); - completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))); + try { + startOffset = startOffsetDuringInitialization(partitionData.startOffset()); + } catch (Exception e) { + completeInitializationWithException(future, e); return; } - InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(), - stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null); - cachedState.put(stateBatch.firstOffset(), inFlightBatch); - } - // Update the endOffset of the partition. - if (!cachedState.isEmpty()) { - // If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records - // in the cached state are not missed - findNextFetchOffset.set(true); - endOffset = cachedState.lastEntry().getValue().lastOffset(); - // In case the persister read state RPC result contains no AVAILABLE records, we can update cached state - // and start/end offsets. - maybeUpdateCachedStateAndOffsets(); - } else { - endOffset = startOffset; + stateEpoch = partitionData.stateEpoch(); + + List<PersisterStateBatch> stateBatches = partitionData.stateBatches(); + for (PersisterStateBatch stateBatch : stateBatches) { + if (stateBatch.firstOffset() < startOffset) { + log.error("Invalid state batch found for the share partition: {}-{}. The base offset: {}" + + " is less than the start offset: {}.", groupId, topicIdPartition, + stateBatch.firstOffset(), startOffset); + completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition))); + return; + } + InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(), + stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null); + cachedState.put(stateBatch.firstOffset(), inFlightBatch); + } + // Update the endOffset of the partition. + if (!cachedState.isEmpty()) { + // If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records + // in the cached state are not missed + findNextFetchOffset.set(true); + endOffset = cachedState.lastEntry().getValue().lastOffset(); + // In case the persister read state RPC result contains no AVAILABLE records, we can update cached state + // and start/end offsets. + maybeUpdateCachedStateAndOffsets(); + } else { + endOffset = startOffset; + } + // Set the partition state to Active and complete the future. + partitionState = SharePartitionState.ACTIVE; + future.complete(null); Review Comment: > I don't see that should be a problem. There can anyways be multiple threads waiting on the same lock in other methods as well. Am I missing something here? Yes, that’s a great question. One possible deadlock path from my perspective could occur as follows: a_thread: 1. Holds the lock of a shared partition. 2. Attempts to complete a delayed share fetch. 3. Waits for the lock of the delayed share fetch. b_thread: 1. Holds the lock of the delayed share fetch. 2. Attempts to access the shared partition. 3. Waits for the lock of the shared partition. Based on these rules, the potential deadlock path could be as [this line](https://github.com/apache/kafka/blob/615f1a0bf999cd0b5ccf38c09e6afaba89538e9f/core/src/main/java/kafka/server/share/SharePartitionManager.java#L589). The PersisterStateManager thread acts as a_thread, while b_thread could be any thread attempting to complete a delayed share fetch while handling other types of requests. At any rate, my point is that we should hold the lock only when absolutely necessary, even if we haven’t encountered an existing deadlock case yet. This minimizes the risk and improves overall robustness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org