junrao commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r609895193
########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * <p> + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * <p> + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * <p> + * <ul> + * <li> + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * <br> + * Segment in this state indicates it is not yet copied successfully. So, these segments will not be + * accessible for reads but these are considered for cleanups when a partition is deleted. + * </li> + * <li> + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}: + * <br> + * Segment in this state indicates it is successfully copied and it is available for reads. So, these segments + * will be accessible for reads. But this should be available for any cleanup activity like deleting segments by the + * caller of this class. + * </li> + * <li> + * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}: + * Segment in this state indicates it is getting deleted. That means, it is not available for reads. But it should be + * available for any cleanup activity like deleting segments by the caller of this class. + * </li> + * <li> + * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}: + * Segment in this state indicate it is already deleted. That means, it is not available for any activity including + * reads or cleanup activity. This cache will clear entries containing this state. + * </li> + * </ul> + * + * <p> + * The below table summarizes whether the segment with the respective state are available for the given methods. + * <pre> + * +---------------------------------+----------------------+------------------------+-------------------------+-------------------------+ + * | Method / SegmentState | COPY_SEGMENT_STARTED | COPY_SEGMENT_FINISHED | DELETE_SEGMENT_STARTED | DELETE_SEGMENT_STARTED | + * |---------------------------------+----------------------+------------------------+-------------------------+-------------------------| + * | remoteLogSegmentMetadata | No | Yes | No | No | + * | (int leaderEpoch, long offset) | | | | | + * |---------------------------------+----------------------+------------------------+-------------------------+-------------------------| + * | listRemoteLogSegments | Yes | Yes | Yes | No | + * | (int leaderEpoch) | | | | | + * |---------------------------------+----------------------+------------------------+-------------------------+-------------------------| + * | listAllRemoteLogSegments() | Yes | Yes | Yes | No | + * | | | | | | + * +---------------------------------+----------------------+------------------------+-------------------------+-------------------------+ + * </pre> + * </p> + * <p></p> + */ +public class RemoteLogMetadataCache { + + private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCache.class); + + // It contains all the segment-id to metadata mappings which did not reach the terminal state viz DELETE_SEGMENT_FINISHED. + private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> idToSegmentMetadata + = new ConcurrentHashMap<>(); + + // It contains leader epoch to the respective entry containing the state. + private final ConcurrentMap<Integer, RemoteLogLeaderEpochState> leaderEpochEntries = new ConcurrentHashMap<>(); Review comment: > One way to do that is to clear the entry when the respective RemoteLogLeaderEpochState is empty. That means all the segments reached DELETE_SEGMENT_FINISHED state. > This is not currently addressed. I plan to look into it when we integrate these APIs with RemoteLogManager by exploring other options too. Could we add a TODO comment here so that we don't forget about it? ########## File path: settings.gradle ########## @@ -29,6 +29,7 @@ include 'clients', 'log4j-appender', 'metadata', 'raft', + 'storage', Review comment: This is redundant. ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class InmemoryRemoteStorageManagerTest { + private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteStorageManagerTest.class); + + private static final TopicPartition TP = new TopicPartition("foo", 1); + private static final File DIR = TestUtils.tempDirectory("inmem-rsm-"); + private static final Random RANDOM = new Random(); + + @Test + public void testCopyLogSegment() throws Exception { + InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager(); + RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata(); + LogSegmentData logSegmentData = createLogSegmentData(); + // Copy all the segment data. + rsm.copyLogSegmentData(segmentMetadata, logSegmentData); + + // Check that the segment data exists in in-memory RSM. + boolean containsSegment = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForSegment(segmentMetadata)); + Assertions.assertTrue(containsSegment); + + // Check that the indexes exist in in-memory RSM. + for (RemoteStorageManager.IndexType indexType : RemoteStorageManager.IndexType.values()) { + boolean containsIndex = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForIndex(segmentMetadata, indexType)); + Assertions.assertTrue(containsIndex); + } + } + + private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { + TopicIdPartition topicPartition = new TopicIdPartition(Uuid.randomUuid(), TP); + RemoteLogSegmentId id = new RemoteLogSegmentId(topicPartition, Uuid.randomUuid()); + return new RemoteLogSegmentMetadata(id, 100L, 200L, System.currentTimeMillis(), 0, + System.currentTimeMillis(), 100, Collections.singletonMap(1, 100L)); + } + + @Test + public void testFetchLogSegmentIndexes() throws Exception { + InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager(); + RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata(); + int segSize = 100; + LogSegmentData logSegmentData = createLogSegmentData(segSize); + + // Copy the segment + rsm.copyLogSegmentData(segmentMetadata, logSegmentData); + + // Check segment data exists for the copied segment. + try (InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 0)) { + checkContentSame(segmentStream, logSegmentData.logSegment()); + } + + HashMap<RemoteStorageManager.IndexType, Path> expectedIndexToPaths = new HashMap<>(); + expectedIndexToPaths.put(RemoteStorageManager.IndexType.OFFSET, logSegmentData.offsetIndex()); + expectedIndexToPaths.put(RemoteStorageManager.IndexType.TIMESTAMP, logSegmentData.timeIndex()); + expectedIndexToPaths.put(RemoteStorageManager.IndexType.TRANSACTION, logSegmentData.txnIndex()); + expectedIndexToPaths.put(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, logSegmentData.producerSnapshotIndex()); + + // Check all segment indexes exist for the copied segment. + for (Map.Entry<RemoteStorageManager.IndexType, Path> entry : expectedIndexToPaths.entrySet()) { + RemoteStorageManager.IndexType indexType = entry.getKey(); + Path indexPath = entry.getValue(); + log.info("Fetching index type: {}, indexPath: {}", indexType, indexPath); Review comment: Could we move this to debug level then? ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * This class represents the in-memory state of segments associated with a leader epoch. This includes the mapping of offset to + * segment ids and unreferenced segments which are not mapped to any offset but they exist in remote storage. + * <p> + * This is used by {@link RemoteLogMetadataCache} to track the segments for each leader epoch. + */ +class RemoteLogLeaderEpochState { + + // It contains offset to segment ids mapping with the segment state as COPY_SEGMENT_FINISHED. + private final NavigableMap<Long, RemoteLogSegmentId> offsetToId = new ConcurrentSkipListMap<>(); + + /** + * It represents unreferenced segments for this leader epoch. It contains the segments still in COPY_SEGMENT_STARTED + * and DELETE_SEGMENT_STARTED state or these have been replaced by callers with other segments having the same + * start offset for the leader epoch. These will be returned by {@link RemoteLogMetadataCache#listAllRemoteLogSegments()} + * and {@link RemoteLogMetadataCache#listRemoteLogSegments(int leaderEpoch)} so that callers can clean them up if + * they still exist. These will be cleaned from the cache once they reach DELETE_SEGMENT_FINISHED state. + */ + private final Set<RemoteLogSegmentId> unreferencedSegmentIds = ConcurrentHashMap.newKeySet(); + + // It represents the highest log offset of the segments that were updated with updateHighestLogOffset. + private volatile Long highestLogOffset; + + /** + * Returns all the segments associated with this leader epoch sorted by start offset in ascending order. + * + * @param idToSegmentMetadata mapping of id to segment metadata. This will be used to get RemoteLogSegmentMetadata + * for an id to be used for sorting. + * @return + */ + Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments(Map<RemoteLogSegmentId, RemoteLogSegmentMetadata> idToSegmentMetadata) { + // Return all the segments including unreferenced metadata. + int size = offsetToId.size() + unreferencedSegmentIds.size(); + if (size == 0) { + return Collections.emptyIterator(); + } + + ArrayList<RemoteLogSegmentMetadata> metadataList = new ArrayList<>(size); + for (RemoteLogSegmentId id : offsetToId.values()) { + metadataList.add(idToSegmentMetadata.get(id)); + } + + if (!unreferencedSegmentIds.isEmpty()) { + for (RemoteLogSegmentId id : unreferencedSegmentIds) { + metadataList.add(idToSegmentMetadata.get(id)); + } + + // sort only when unreferenced entries exist as they are already sorted in offsetToId. + metadataList.sort(Comparator.comparingLong(RemoteLogSegmentMetadata::startOffset)); + } + + return metadataList.iterator(); + } + + void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, + Long leaderEpochEndOffset) { + // Add the segment epochs mapping as the segment is copied successfully. + RemoteLogSegmentId oldEntry = offsetToId.put(startOffset, remoteLogSegmentId); + + // Remove the metadata from unreferenced entries as it is successfully copied and added to the offset mapping. + unreferencedSegmentIds.remove(remoteLogSegmentId); + + // Add the old entry to unreferenced entries as the mapping is removed for the old entry. + if (oldEntry != null) { + unreferencedSegmentIds.add(oldEntry); + } + + // Update the highest offset entry for this leader epoch as we added a new mapping. + maybeUpdateHighestLogOffset(leaderEpochEndOffset); + } + + void handleSegmentWithDeleteSegmentStartedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, + Long leaderEpochEndOffset) { + // Remove the offset mappings as this segment is getting deleted. + offsetToId.remove(startOffset, remoteLogSegmentId); + + // Add this entry to unreferenced set for the leader epoch as it is being deleted. + // This allows any retries of deletion as these are returned from listAllSegments and listSegments(leaderEpoch). + unreferencedSegmentIds.add(remoteLogSegmentId); + + // Update the highest offset entry for this leader epoch. This needs to be done as a segment can reach this + // state without going through COPY_SEGMENT_FINISHED state. + maybeUpdateHighestLogOffset(leaderEpochEndOffset); Review comment: Sounds good. Could you make the change in the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org