FrankYang0529 commented on code in PR #20204:
URL: https://github.com/apache/kafka/pull/20204#discussion_r2218550322


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##########
@@ -392,93 +316,74 @@ public boolean isReady(TopicIdPartition topicIdPartition) 
{
         return remotePartitionMetadataStore.isInitialized(topicIdPartition);
     }
 
-    private void initializeResources() {
+    private void handleRetry(long retryIntervalMs) {
+        log.info("Sleep for {} ms before retrying.", retryIntervalMs);
+        Utils.sleep(retryIntervalMs);
+    }
+
+    private void initializeResources(TopicBasedRemoteLogMetadataManagerConfig 
rlmmConfig) {
         log.info("Initializing topic-based RLMM resources");
-        final NewTopic remoteLogMetadataTopicRequest = 
createRemoteLogMetadataTopicRequest();
-        boolean topicCreated = false;
+        int metadataTopicPartitionCount = 
rlmmConfig.metadataTopicPartitionsCount();
+        long retryIntervalMs = rlmmConfig.initializationRetryIntervalMs();
+        long retryMaxTimeoutMs = rlmmConfig.initializationRetryMaxTimeoutMs();
+        RemoteLogMetadataTopicPartitioner partitioner = 
partitionerFunction.apply(metadataTopicPartitionCount);
+        NewTopic newTopic = newRemoteLogMetadataTopic(rlmmConfig);
+        boolean isTopicCreated = false;
         long startTimeMs = time.milliseconds();
-        Admin adminClient = null;
-        try {
-            adminClient = Admin.create(rlmmConfig.commonProperties());
-            // Stop if it is already initialized or closing.
-            while (!(initialized.get() || closing.get())) {
-
-                // If it is timed out then raise an error to exit.
-                if (time.milliseconds() - startTimeMs > 
rlmmConfig.initializationRetryMaxTimeoutMs()) {
-                    log.error("Timed out in initializing the resources, 
retried to initialize the resource for {} ms.",
-                            rlmmConfig.initializationRetryMaxTimeoutMs());
+        try (Admin admin = Admin.create(rlmmConfig.commonProperties())) {
+            while (!(initialized.get() || closing.get() || 
initializationFailed)) {
+                if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) {
+                    log.error("Timed out to initialize the resources within {} 
ms.", retryMaxTimeoutMs);
                     initializationFailed = true;
-                    return;
-                }
-
-                if (!topicCreated) {
-                    topicCreated = createTopic(adminClient, 
remoteLogMetadataTopicRequest);
+                    break;
                 }
-
-                if (!topicCreated) {
-                    // Sleep for INITIALIZATION_RETRY_INTERVAL_MS before 
trying to create the topic again.
-                    log.info("Sleep for {} ms before it is retried again.", 
rlmmConfig.initializationRetryIntervalMs());
-                    Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
+                isTopicCreated = isTopicCreated || createTopic(admin, 
newTopic);
+                if (!isTopicCreated) {
+                    handleRetry(retryIntervalMs);
                     continue;
-                } else {
-                    // If topic is already created, validate the existing 
topic partitions.
-                    try {
-                        String topicName = 
remoteLogMetadataTopicRequest.name();
-                        // If the existing topic partition size is not same as 
configured, mark initialization as failed and exit.
-                        if (!isPartitionsCountSameAsConfigured(adminClient, 
topicName)) {
-                            initializationFailed = true;
-                        }
-                    } catch (Exception e) {
-                        log.info("Sleep for {} ms before it is retried 
again.", rlmmConfig.initializationRetryIntervalMs());
-                        
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
-                        continue;
+                }
+                try {
+                    if (!isPartitionsCountSameAsConfigured(admin, 
newTopic.name(), metadataTopicPartitionCount)) {
+                        initializationFailed = true;
+                        break;
                     }
+                } catch (Exception e) {
+                    handleRetry(retryIntervalMs);
+                    continue;
                 }

Review Comment:
   Do you know why we need another try-catch block here? If it's about `Admin` 
exception, do we need another try-catch block for creating topic?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##########
@@ -188,8 +156,7 @@ public CompletableFuture<Void> 
putRemotePartitionDeleteMetadata(RemotePartitionD
      * @throws RemoteStorageException if there are any storage errors occur.
      */
     private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition 
topicIdPartition,
-                                                           RemoteLogMetadata 
remoteLogMetadata)
-            throws RemoteStorageException {
+                                                           RemoteLogMetadata 
remoteLogMetadata) throws RemoteStorageException {
         log.debug("Storing the partition: {} metadata: {}", topicIdPartition, 
remoteLogMetadata);

Review Comment:
   How about simplifying `storeRemoteLogMetadata`? The 
`RemoteLogMetadata#topicIdPartition` can get `topicIdPartition`, so we can 
rewrite the function like:
   
   ```java
       private CompletableFuture<Void> storeRemoteLogMetadata(RemoteLogMetadata 
remoteLogMetadata) throws RemoteStorageException {
           log.debug("Storing the partition: {} metadata: {}", 
remoteLogMetadata.topicIdPartition(), remoteLogMetadata);
           // ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to