satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r712009321



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
##########
@@ -42,21 +45,36 @@
 public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHandler implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(RemotePartitionMetadataStore.class);
 
+    private final Path logDir;
+
     private Map<TopicIdPartition, RemotePartitionDeleteMetadata> 
idToPartitionDeleteMetadata =
             new ConcurrentHashMap<>();
 
-    private Map<TopicIdPartition, RemoteLogMetadataCache> 
idToRemoteLogMetadataCache =
+    private Map<TopicIdPartition, FileBasedRemoteLogMetadataCache> 
idToRemoteLogMetadataCache =
             new ConcurrentHashMap<>();
 
+    public RemotePartitionMetadataStore(Path logDir) {
+        this.logDir = logDir;
+    }
+
     @Override
     public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
         log.debug("Adding remote log segment : [{}]", 
remoteLogSegmentMetadata);
 
-        RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
+        final RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
+        TopicIdPartition topicIdPartition = 
remoteLogSegmentId.topicIdPartition();
+
+        // This should have been already existing as it is loaded when the 
partitions are assigned.
+        RemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
+        if (remoteLogMetadataCache != null) {
+            
remoteLogMetadataCache.addCopyInProgressSegment(remoteLogSegmentMetadata);
+        } else {
+            log.error("No partition metadata found for : " + topicIdPartition);

Review comment:
       Even if we throw an exception here, caller needs to handle these errors 
by logging them.  This can happen when a partition is deleted but an event for 
the same partition is consumed by the ConsumerTask.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to