FrankYang0529 commented on code in PR #20204: URL: https://github.com/apache/kafka/pull/20204#discussion_r2218550322
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ########## @@ -392,93 +316,74 @@ public boolean isReady(TopicIdPartition topicIdPartition) { return remotePartitionMetadataStore.isInitialized(topicIdPartition); } - private void initializeResources() { + private void handleRetry(long retryIntervalMs) { + log.info("Sleep for {} ms before retrying.", retryIntervalMs); + Utils.sleep(retryIntervalMs); + } + + private void initializeResources(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) { log.info("Initializing topic-based RLMM resources"); - final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest(); - boolean topicCreated = false; + int metadataTopicPartitionCount = rlmmConfig.metadataTopicPartitionsCount(); + long retryIntervalMs = rlmmConfig.initializationRetryIntervalMs(); + long retryMaxTimeoutMs = rlmmConfig.initializationRetryMaxTimeoutMs(); + RemoteLogMetadataTopicPartitioner partitioner = partitionerFunction.apply(metadataTopicPartitionCount); + NewTopic newTopic = newRemoteLogMetadataTopic(rlmmConfig); + boolean isTopicCreated = false; long startTimeMs = time.milliseconds(); - Admin adminClient = null; - try { - adminClient = Admin.create(rlmmConfig.commonProperties()); - // Stop if it is already initialized or closing. - while (!(initialized.get() || closing.get())) { - - // If it is timed out then raise an error to exit. - if (time.milliseconds() - startTimeMs > rlmmConfig.initializationRetryMaxTimeoutMs()) { - log.error("Timed out in initializing the resources, retried to initialize the resource for {} ms.", - rlmmConfig.initializationRetryMaxTimeoutMs()); + try (Admin admin = Admin.create(rlmmConfig.commonProperties())) { + while (!(initialized.get() || closing.get() || initializationFailed)) { + if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) { + log.error("Timed out to initialize the resources within {} ms.", retryMaxTimeoutMs); initializationFailed = true; - return; - } - - if (!topicCreated) { - topicCreated = createTopic(adminClient, remoteLogMetadataTopicRequest); + break; } - - if (!topicCreated) { - // Sleep for INITIALIZATION_RETRY_INTERVAL_MS before trying to create the topic again. - log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs()); - Utils.sleep(rlmmConfig.initializationRetryIntervalMs()); + isTopicCreated = isTopicCreated || createTopic(admin, newTopic); + if (!isTopicCreated) { + handleRetry(retryIntervalMs); continue; - } else { - // If topic is already created, validate the existing topic partitions. - try { - String topicName = remoteLogMetadataTopicRequest.name(); - // If the existing topic partition size is not same as configured, mark initialization as failed and exit. - if (!isPartitionsCountSameAsConfigured(adminClient, topicName)) { - initializationFailed = true; - } - } catch (Exception e) { - log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs()); - Utils.sleep(rlmmConfig.initializationRetryIntervalMs()); - continue; + } + try { + if (!isPartitionsCountSameAsConfigured(admin, newTopic.name(), metadataTopicPartitionCount)) { + initializationFailed = true; + break; } + } catch (Exception e) { + handleRetry(retryIntervalMs); + continue; } Review Comment: Do you know why we need another try-catch block here? If it's about `Admin` exception, do we need another try-catch block for creating topic? ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ########## @@ -188,8 +156,7 @@ public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionD * @throws RemoteStorageException if there are any storage errors occur. */ private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition topicIdPartition, - RemoteLogMetadata remoteLogMetadata) - throws RemoteStorageException { + RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException { log.debug("Storing the partition: {} metadata: {}", topicIdPartition, remoteLogMetadata); Review Comment: How about simplifying `storeRemoteLogMetadata`? The `RemoteLogMetadata#topicIdPartition` can get `topicIdPartition`, so we can rewrite the function like: ```java private CompletableFuture<Void> storeRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException { log.debug("Storing the partition: {} metadata: {}", remoteLogMetadata.topicIdPartition(), remoteLogMetadata); // ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org