Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use CompletableFuture and solve hang problem [gravitino]

2025-11-28 Thread via GitHub


yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2570758389


##
catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java:
##
@@ -32,7 +34,9 @@ public class HDFSFileSystemProvider implements 
FileSystemProvider {
   @Override
   public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map config)
   throws IOException {
-Configuration configuration = 
FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, config);
+Map hadoopConfMap = additionalHDFSConfig(config);
+Configuration configuration =
+FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, hadoopConfMap);

Review Comment:
   No need to import it as it's defined in the base class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use CompletableFuture and solve hang problem [gravitino]

2025-11-27 Thread via GitHub


Copilot commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2570368218


##
catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java:
##
@@ -32,7 +34,9 @@ public class HDFSFileSystemProvider implements 
FileSystemProvider {
   @Override
   public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map config)
   throws IOException {
-Configuration configuration = 
FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, config);
+Map hadoopConfMap = additionalHDFSConfig(config);
+Configuration configuration =
+FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, hadoopConfMap);

Review Comment:
   `GRAVITINO_BYPASS` is not imported. This code references `GRAVITINO_BYPASS` 
which is defined in the `FileSystemProvider` interface, but there's no static 
import for it. Add `import static 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;` to 
fix the compilation error.



##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
   @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
   @VisibleForTesting Cache fileSystemCache;
 
+  private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+  new ThreadPoolExecutor(
+  Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+  Math.max(2, Runtime.getRuntime().availableProcessors() * 2),

Review Comment:
   The core and maximum pool sizes are identical, making this effectively a 
fixed-size thread pool. However, `allowCoreThreadTimeOut(true)` is set, which 
means core threads can time out. This configuration is confusing because with 
identical core and max sizes, there are no "extra" threads beyond the core. 
Consider either: (1) using `Executors.newFixedThreadPool()` for clarity, or (2) 
setting different core and max sizes if scaling is intended.



##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
   @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
   @VisibleForTesting Cache fileSystemCache;
 
+  private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+  new ThreadPoolExecutor(
+  Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+  Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+  50L,
+  TimeUnit.MILLISECONDS,
+  new ArrayBlockingQueue<>(1000),
+  r -> {
+Thread t = new Thread(r, "fileset-filesystem-getter-pool");
+t.setDaemon(true);
+return t;
+  },
+  new ThreadPoolExecutor.AbortPolicy()) {
+{
+  allowCoreThreadTimeOut(true);
+}
+  };
+

Review Comment:
   The static `ThreadPoolExecutor` is never shut down, creating a resource 
leak. Unlike the `scheduler` field which is shut down in the `close()` method, 
this executor will continue running even after `FilesetCatalogOperations` 
instances are closed. Consider making this executor non-static and shutting it 
down in the `close()` method, or if it must be static, implement a shutdown 
hook or singleton lifecycle management.
   ```suggestion
   
 static {
   Runtime.getRuntime().addShutdownHook(new Thread(() -> {
 GET_FILESYSTEM_EXECUTOR.shutdown();
 try {
   if (!GET_FILESYSTEM_EXECUTOR.awaitTermination(10, TimeUnit.SECONDS)) 
{
 GET_FILESYSTEM_EXECUTOR.shutdownNow();
   }
 } catch (InterruptedException e) {
   GET_FILESYSTEM_EXECUTOR.shutdownNow();
   Thread.currentThread().interrupt();
 }
   }, "fileset-filesystem-getter-pool-shutdown-hook"));
 }
   ```



##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -1397,30 +1418,35 @@ FileSystem getFileSystem(Path path, Map 
config) throws IOExcepti
 .catalogPropertiesMetadata()
 .getOrDefault(
 config, 
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+Future fileSystemFuture =
+GET_FILESYSTEM_EXECUTOR.submit(() -> provider.getFileSystem(path, 
config));

Review Comment:
   The new async FileSystem retrieval mechanism with timeout handling lacks 
test coverage. There are no tests verifying: (1) timeout behavior when 
FileSystem creation hangs, (2) proper exception propagation from 
ExecutionException, or (3) cancellation handling. Consider adding tests that 
mock a slow `FileSystemProvider` to verify the timeout mechanism works 
correctly.



##
catalogs/catalog-fileset/src/main/java/org/apache/gravit