satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r712009321
########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java ########## @@ -42,21 +45,36 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHandler implements Closeable { private static final Logger log = LoggerFactory.getLogger(RemotePartitionMetadataStore.class); + private final Path logDir; + private Map<TopicIdPartition, RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata = new ConcurrentHashMap<>(); - private Map<TopicIdPartition, RemoteLogMetadataCache> idToRemoteLogMetadataCache = + private Map<TopicIdPartition, FileBasedRemoteLogMetadataCache> idToRemoteLogMetadataCache = new ConcurrentHashMap<>(); + public RemotePartitionMetadataStore(Path logDir) { + this.logDir = logDir; + } + @Override public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata); - RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId(); + final RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId(); + TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition(); + + // This should have been already existing as it is loaded when the partitions are assigned. + RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); + if (remoteLogMetadataCache != null) { + remoteLogMetadataCache.addCopyInProgressSegment(remoteLogSegmentMetadata); + } else { + log.error("No partition metadata found for : " + topicIdPartition); Review comment: Even if we throw an exception here, caller needs to handle these errors by logging them. This can happen when a partition is deleted but an event for the same partition is consumed by the ConsumerTask. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org