Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao merged PR #9282: URL: https://github.com/apache/gravitino/pull/9282 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591560852
##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -1395,30 +1417,43 @@ FileSystem getFileSystem(Path path, Map
config) throws IOExcepti
.catalogPropertiesMetadata()
.getOrDefault(
config,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+Future fileSystemFuture =
+fileSystemExecutor.submit(() -> provider.getFileSystem(path, config));
+
try {
- AtomicReference fileSystem = new AtomicReference<>();
- Awaitility.await()
- .atMost(timeoutSeconds, TimeUnit.SECONDS)
- .pollInterval(1, TimeUnit.MILLISECONDS)
- .until(
- () -> {
-fileSystem.set(provider.getFileSystem(path, config));
-return true;
- });
- return fileSystem.get();
-} catch (ConditionTimeoutException e) {
+ return fileSystemFuture.get(timeoutSeconds, TimeUnit.SECONDS);
+} catch (TimeoutException e) {
+ fileSystemFuture.cancel(true);
+
+ LOG.warn(
+ "Timeout when getting FileSystem for path: {}, scheme: {}, provider:
{} within {} seconds",
+ path,
+ scheme,
+ provider,
+ timeoutSeconds,
+ e);
+
throw new IOException(
String.format(
- "Failed to get FileSystem for path: %s, scheme: %s, provider:
%s, config: %s within %s "
+ "Failed to get FileSystem for path: %s, scheme: %s, provider: %s
within %s "
+ "seconds, please check the configuration or increase the "
+ "file system connection timeout time by setting catalog
property: %s",
path,
scheme,
provider,
- config,
timeoutSeconds,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS),
e);
+} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for FileSystem", e);
Review Comment:
I think you don't understand what I mean. `InterruptedException` often
happens when closing or shutting down. This is expected and should not throw an
`IOException` instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591543020
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1874,42 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ @Timeout(15)
+ void testGetFileSystemTimeoutThrowsException() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+try (FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations()) {
+ LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+ when(localFileSystemProvider.scheme()).thenReturn("file");
+ when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+ .thenAnswer(
+ invocation -> {
+// Block 100s, however, the timeout is set to 6s by default in
+// FilesetCatalogOperations, so it's expected to be over
within 10s
+Awaitility.await().forever().until(() -> false);
+return new LocalFileSystem();
+ });
+ Map fileSystemProviderMapOriginal = new
HashMap<>();
+ fileSystemProviderMapOriginal.put("file", localFileSystemProvider);
+ FieldUtils.writeField(
+ filesetCatalogOperations, "fileSystemProvidersMap",
fileSystemProviderMapOriginal, true);
+
+ FieldUtils.writeField(
+ filesetCatalogOperations, "propertiesMetadata",
FILESET_PROPERTIES_METADATA, true);
+
+ // Test the following method should finish with 10s
+ long now = System.currentTimeMillis();
+ try {
+filesetCatalogOperations.getFileSystem(new Path("file:///tmp"),
ImmutableMap.of());
+ } catch (IOException e) {
+long timeTake = System.currentTimeMillis() - now;
+Assertions.assertTrue(timeTake <= 1);
Review Comment:
I have used annotation `Timeout(15)` to replace it.
##
catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java:
##
@@ -48,4 +55,29 @@ public String scheme() {
public String name() {
return BUILTIN_HDFS_FS_PROVIDER;
}
+
+ /**
+ * Add additional HDFS specific configurations.
+ *
+ * @param configs Original configurations.
+ * @return Configurations with additional HDFS specific configurations.
+ */
+ private Map additionalHDFSConfig(Map
configs) {
+Map additionalConfigs = Maps.newHashMap(configs);
+
+// Avoid multiple retries to speed up failure in test cases.
+// Use hard code instead of
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY to
+// avoid dependency on a specific Hadoop version.
+if (!configs.containsKey(HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY)) {
+ additionalConfigs.put(HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY,
DEFAULT_CONNECTION_TIMEOUT);
+}
+
+if (!configs.containsKey(HDFS_IPC_PING_KEY)) {
+ additionalConfigs.put(HDFS_IPC_PING_KEY, "true");
Review Comment:
changed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591541654
##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache fileSystemCache;
+ private final ThreadPoolExecutor fileSystemExecutor =
+ new ThreadPoolExecutor(
+ Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2,
16)),
+ Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2,
32)),
+ 5L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1000),
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("fileset-filesystem-getter-pool-%d")
+ .build(),
+ new ThreadPoolExecutor.AbortPolicy()) {
+{
+ allowCoreThreadTimeOut(true);
+}
+ };
Review Comment:
added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591541288
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1874,42 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ @Timeout(15)
+ void testGetFileSystemTimeoutThrowsException() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+try (FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations()) {
+ LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+ when(localFileSystemProvider.scheme()).thenReturn("file");
+ when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+ .thenAnswer(
+ invocation -> {
+// Block 100s, however, the timeout is set to 6s by default in
+// FilesetCatalogOperations, so it's expected to be over
within 10s
+Awaitility.await().forever().until(() -> false);
+return new LocalFileSystem();
+ });
+ Map fileSystemProviderMapOriginal = new
HashMap<>();
+ fileSystemProviderMapOriginal.put("file", localFileSystemProvider);
+ FieldUtils.writeField(
+ filesetCatalogOperations, "fileSystemProvidersMap",
fileSystemProviderMapOriginal, true);
+
+ FieldUtils.writeField(
+ filesetCatalogOperations, "propertiesMetadata",
FILESET_PROPERTIES_METADATA, true);
+
+ // Test the following method should finish with 10s
+ long now = System.currentTimeMillis();
+ try {
+filesetCatalogOperations.getFileSystem(new Path("file:///tmp"),
ImmutableMap.of());
+ } catch (IOException e) {
+long timeTake = System.currentTimeMillis() - now;
+Assertions.assertTrue(timeTake <= 1);
+ }
Review Comment:
Changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591536255
##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -1395,30 +1417,43 @@ FileSystem getFileSystem(Path path, Map
config) throws IOExcepti
.catalogPropertiesMetadata()
.getOrDefault(
config,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+Future fileSystemFuture =
+fileSystemExecutor.submit(() -> provider.getFileSystem(path, config));
+
try {
- AtomicReference fileSystem = new AtomicReference<>();
- Awaitility.await()
- .atMost(timeoutSeconds, TimeUnit.SECONDS)
- .pollInterval(1, TimeUnit.MILLISECONDS)
- .until(
- () -> {
-fileSystem.set(provider.getFileSystem(path, config));
-return true;
- });
- return fileSystem.get();
-} catch (ConditionTimeoutException e) {
+ return fileSystemFuture.get(timeoutSeconds, TimeUnit.SECONDS);
+} catch (TimeoutException e) {
+ fileSystemFuture.cancel(true);
+
+ LOG.warn(
+ "Timeout when getting FileSystem for path: {}, scheme: {}, provider:
{} within {} seconds",
+ path,
+ scheme,
+ provider,
+ timeoutSeconds,
+ e);
+
throw new IOException(
String.format(
- "Failed to get FileSystem for path: %s, scheme: %s, provider:
%s, config: %s within %s "
+ "Failed to get FileSystem for path: %s, scheme: %s, provider: %s
within %s "
+ "seconds, please check the configuration or increase the "
+ "file system connection timeout time by setting catalog
property: %s",
path,
scheme,
provider,
- config,
timeoutSeconds,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS),
e);
+} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for FileSystem", e);
Review Comment:
Normally, a `TimeoutException` will occur if it hangs for a long time. and
only when we interrupt it deliberately will it throw `InterruptedException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591338832
##
catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java:
##
@@ -48,4 +55,29 @@ public String scheme() {
public String name() {
return BUILTIN_HDFS_FS_PROVIDER;
}
+
+ /**
+ * Add additional HDFS specific configurations.
+ *
+ * @param configs Original configurations.
+ * @return Configurations with additional HDFS specific configurations.
+ */
+ private Map additionalHDFSConfig(Map
configs) {
+Map additionalConfigs = Maps.newHashMap(configs);
+
+// Avoid multiple retries to speed up failure in test cases.
+// Use hard code instead of
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY to
+// avoid dependency on a specific Hadoop version.
+if (!configs.containsKey(HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY)) {
+ additionalConfigs.put(HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY,
DEFAULT_CONNECTION_TIMEOUT);
+}
+
+if (!configs.containsKey(HDFS_IPC_PING_KEY)) {
+ additionalConfigs.put(HDFS_IPC_PING_KEY, "true");
Review Comment:
I found that you have several customized conf values here and above, you'd
also define constants for these.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591336406
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1874,42 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ @Timeout(15)
+ void testGetFileSystemTimeoutThrowsException() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+try (FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations()) {
+ LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+ when(localFileSystemProvider.scheme()).thenReturn("file");
+ when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+ .thenAnswer(
+ invocation -> {
+// Block 100s, however, the timeout is set to 6s by default in
+// FilesetCatalogOperations, so it's expected to be over
within 10s
+Awaitility.await().forever().until(() -> false);
+return new LocalFileSystem();
+ });
+ Map fileSystemProviderMapOriginal = new
HashMap<>();
+ fileSystemProviderMapOriginal.put("file", localFileSystemProvider);
+ FieldUtils.writeField(
+ filesetCatalogOperations, "fileSystemProvidersMap",
fileSystemProviderMapOriginal, true);
+
+ FieldUtils.writeField(
+ filesetCatalogOperations, "propertiesMetadata",
FILESET_PROPERTIES_METADATA, true);
+
+ // Test the following method should finish with 10s
+ long now = System.currentTimeMillis();
+ try {
+filesetCatalogOperations.getFileSystem(new Path("file:///tmp"),
ImmutableMap.of());
+ } catch (IOException e) {
+long timeTake = System.currentTimeMillis() - now;
+Assertions.assertTrue(timeTake <= 1);
Review Comment:
I guess this will be very flaky if the test machine is under heavy load.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591332889
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserAuthenticationIT.java:
##
@@ -516,6 +516,7 @@ void createFilesetWithKerberos() {
null,
tableProperty));
exceptionMessage = Throwables.getStackTraceAsString(exception);
+LOG.info("Exception message: {}", exceptionMessage);
Review Comment:
Remove this unnecessary log if it is only for debug.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591323210
##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -1395,30 +1417,43 @@ FileSystem getFileSystem(Path path, Map
config) throws IOExcepti
.catalogPropertiesMetadata()
.getOrDefault(
config,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+Future fileSystemFuture =
+fileSystemExecutor.submit(() -> provider.getFileSystem(path, config));
+
try {
- AtomicReference fileSystem = new AtomicReference<>();
- Awaitility.await()
- .atMost(timeoutSeconds, TimeUnit.SECONDS)
- .pollInterval(1, TimeUnit.MILLISECONDS)
- .until(
- () -> {
-fileSystem.set(provider.getFileSystem(path, config));
-return true;
- });
- return fileSystem.get();
-} catch (ConditionTimeoutException e) {
+ return fileSystemFuture.get(timeoutSeconds, TimeUnit.SECONDS);
+} catch (TimeoutException e) {
+ fileSystemFuture.cancel(true);
+
+ LOG.warn(
+ "Timeout when getting FileSystem for path: {}, scheme: {}, provider:
{} within {} seconds",
+ path,
+ scheme,
+ provider,
+ timeoutSeconds,
+ e);
+
throw new IOException(
String.format(
- "Failed to get FileSystem for path: %s, scheme: %s, provider:
%s, config: %s within %s "
+ "Failed to get FileSystem for path: %s, scheme: %s, provider: %s
within %s "
+ "seconds, please check the configuration or increase the "
+ "file system connection timeout time by setting catalog
property: %s",
path,
scheme,
provider,
- config,
timeoutSeconds,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS),
e);
+} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for FileSystem", e);
Review Comment:
I think this is expected, shall we throw an exception here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591323139 ## catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java: ## @@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations @VisibleForTesting ScheduledThreadPoolExecutor scheduler; @VisibleForTesting Cache fileSystemCache; + private final ThreadPoolExecutor fileSystemExecutor = + new ThreadPoolExecutor( + Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2, 16)), Review Comment: I have allowed core poll timeout, and the threads will stop if there is no task assigned to the pool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591320459
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1874,42 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ @Timeout(15)
+ void testGetFileSystemTimeoutThrowsException() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+try (FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations()) {
+ LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+ when(localFileSystemProvider.scheme()).thenReturn("file");
+ when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+ .thenAnswer(
+ invocation -> {
+// Block 100s, however, the timeout is set to 6s by default in
+// FilesetCatalogOperations, so it's expected to be over
within 10s
+Awaitility.await().forever().until(() -> false);
Review Comment:
`Thread.sleep(Long.MAX_VALUE)` is not suggested to use here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591319415 ## catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java: ## @@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations @VisibleForTesting ScheduledThreadPoolExecutor scheduler; @VisibleForTesting Cache fileSystemCache; + private final ThreadPoolExecutor fileSystemExecutor = + new ThreadPoolExecutor( + Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2, 16)), Review Comment: Do we need to keep such threads active for use? My feeling is that most of the threads can be swept when idle to save resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
Copilot commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2591311749
##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache fileSystemCache;
+ private final ThreadPoolExecutor fileSystemExecutor =
+ new ThreadPoolExecutor(
+ Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2,
16)),
+ Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2,
32)),
+ 5L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1000),
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("fileset-filesystem-getter-pool-%d")
+ .build(),
+ new ThreadPoolExecutor.AbortPolicy()) {
+{
+ allowCoreThreadTimeOut(true);
+}
+ };
Review Comment:
The ThreadPoolExecutor is initialized as a final instance field, but the
`close()` method calls `shutdownNow()` without checking if it has already been
shut down. If `close()` is called multiple times, this could throw an exception
or cause issues. Consider adding a guard to check the executor's state before
shutting it down, or use a safer shutdown pattern.
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1874,42 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ @Timeout(15)
+ void testGetFileSystemTimeoutThrowsException() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+try (FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations()) {
+ LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+ when(localFileSystemProvider.scheme()).thenReturn("file");
+ when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+ .thenAnswer(
+ invocation -> {
+// Block 100s, however, the timeout is set to 6s by default in
+// FilesetCatalogOperations, so it's expected to be over
within 10s
+Awaitility.await().forever().until(() -> false);
+return new LocalFileSystem();
+ });
+ Map fileSystemProviderMapOriginal = new
HashMap<>();
+ fileSystemProviderMapOriginal.put("file", localFileSystemProvider);
+ FieldUtils.writeField(
+ filesetCatalogOperations, "fileSystemProvidersMap",
fileSystemProviderMapOriginal, true);
+
+ FieldUtils.writeField(
+ filesetCatalogOperations, "propertiesMetadata",
FILESET_PROPERTIES_METADATA, true);
+
+ // Test the following method should finish with 10s
+ long now = System.currentTimeMillis();
+ try {
+filesetCatalogOperations.getFileSystem(new Path("file:///tmp"),
ImmutableMap.of());
+ } catch (IOException e) {
+long timeTake = System.currentTimeMillis() - now;
+Assertions.assertTrue(timeTake <= 1);
+ }
Review Comment:
The test expects an IOException to be thrown but doesn't verify the
exception message or type. Consider adding assertions to verify that the
correct exception is thrown with an appropriate error message about the
timeout, to ensure the error handling path works as expected.
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1874,42 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ @Timeout(15)
+ void testGetFileSystemTimeoutThrowsException() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+try (FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations()) {
+ LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+ when(localFileSystemProvider.scheme()).thenReturn("file");
+ when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+ .thenAnswer(
+ invocation -> {
+// Block 100s, however, the timeout is set to 6s by default in
+// FilesetCatalogOperations, so it's expected to be over
within 10s
+Awaitility.await().forever().until(() -> false);
Review Comment:
The test uses `Awaitility.await().forever().until(() -> false)` to simulate
a blocking call. However, this creates a busy-wait loop that consumes CPU
unnecessarily. Consider using `Thread.sleep(Long.MAX_
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2589616739 ## bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java: ## @@ -80,6 +82,25 @@ public Map getFileSystemCredentialConf(Credential[] credentials) return result; } Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588837747 ## bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java: ## @@ -80,6 +82,25 @@ public Map getFileSystemCredentialConf(Credential[] credentials) return result; } Review Comment: Just wait a moment, I have resolved them, but the CI fails abnormally, so I reverted the code and tried to verify whether it was caused by the last commit I reverted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588829842 ## bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java: ## @@ -80,6 +82,25 @@ public Map getFileSystemCredentialConf(Credential[] credentials) return result; } Review Comment: You have resolved all the comments, but I don't see the code change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588828466 ## bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java: ## @@ -80,6 +82,25 @@ public Map getFileSystemCredentialConf(Credential[] credentials) return result; } Review Comment: Where do you fix the code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 closed pull request #9282: [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem URL: https://github.com/apache/gravitino/pull/9282 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588213796 ## catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java: ## @@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations @VisibleForTesting ScheduledThreadPoolExecutor scheduler; @VisibleForTesting Cache fileSystemCache; + private final ThreadPoolExecutor fileSystemExecutor = + new ThreadPoolExecutor( + Math.max(2, Runtime.getRuntime().availableProcessors() * 2), Review Comment: I limit the max thread number to 24 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588209823
##
bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java:
##
@@ -129,6 +131,33 @@ private void checkAndSetCredentialProvider(Map configs) {
}
}
Review Comment:
added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588211961
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1872,41 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ void testGetFileSystemWithTimeout() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations();
+
+LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+when(localFileSystemProvider.scheme()).thenReturn("file");
+when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+.thenAnswer(
+invocation -> {
+ // Sleep 100s, however, the timeout is set to 6s by default in
+ // FilesetCatalogOperations, so
+ // it's expected to over within 10s
+ Thread.sleep(10); // Simulate delay
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588211126
##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -994,6 +1014,8 @@ public void close() throws IOException {
scheduler.shutdownNow();
}
+fileSystemExecutor.shutdownNow();
Review Comment:
resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588207478 ## bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java: ## @@ -62,6 +64,25 @@ public Map getFileSystemCredentialConf(Credential[] credentials) return result; } Review Comment: added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588206187
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1872,41 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ void testGetFileSystemWithTimeout() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations();
+
+LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+when(localFileSystemProvider.scheme()).thenReturn("file");
+when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+.thenAnswer(
+invocation -> {
+ // Sleep 100s, however, the timeout is set to 6s by default in
+ // FilesetCatalogOperations, so
+ // it's expected to over within 10s
+ Thread.sleep(10); // Simulate delay
+ return new LocalFileSystem();
+});
+Map fileSystemProviderMapOriginal = new
HashMap<>();
+fileSystemProviderMapOriginal.put("file", localFileSystemProvider);
+FieldUtils.writeField(
+filesetCatalogOperations, "fileSystemProvidersMap",
fileSystemProviderMapOriginal, true);
+
+FieldUtils.writeField(
+filesetCatalogOperations, "propertiesMetadata",
FILESET_PROPERTIES_METADATA, true);
+
+// Test the following method should finish with 10s
+long now = System.currentTimeMillis();
+try {
+ filesetCatalogOperations.getFileSystem(new Path("file:///tmp"),
ImmutableMap.of());
+} catch (IOException e) {
+ Assertions.assertTrue(System.currentTimeMillis() - now <= 1);
+}
+ }
Review Comment:
fix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588204767
##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache fileSystemCache;
+ private final ThreadPoolExecutor fileSystemExecutor =
+ new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ 5L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1000),
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("fileset-filesystem-getter-pool-%d")
+ .build(),
+ new ThreadPoolExecutor.AbortPolicy()) {
+{
+ allowCoreThreadTimeOut(true);
+}
+ };
Review Comment:
No need as we allow core thread timeout.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588200668 ## bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java: ## @@ -101,6 +103,19 @@ public Map getFileSystemCredentialConf(Credential[] credentials) return result; } Review Comment: added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588199414
##
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##
@@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache fileSystemCache;
+ private final ThreadPoolExecutor fileSystemExecutor =
+ new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ 5L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1000),
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("fileset-filesystem-getter-pool-%d")
+ .build(),
+ new ThreadPoolExecutor.AbortPolicy()) {
+{
+ allowCoreThreadTimeOut(true);
+}
+ };
Review Comment:
fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588198427
##
catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java:
##
@@ -44,6 +48,25 @@ public String scheme() {
return SCHEME_HDFS;
}
+ private Map additionalHDFSConfig(Map
configs) {
+Map additionalConfigs = Maps.newHashMap(configs);
+
+// Avoid multiple retries to speed up failure in test cases.
+// Use hard code instead of
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY to
+// avoid dependency on a specific Hadoop version.
+if (!configs.containsKey("ipc.client.connect.timeout")) {
+ additionalConfigs.put("ipc.client.connect.timeout", "5000");
Review Comment:
added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588196910 ## bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java: ## @@ -80,6 +82,25 @@ public Map getFileSystemCredentialConf(Credential[] credentials) return result; } Review Comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2588197541
##
catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java:
##
@@ -44,6 +48,25 @@ public String scheme() {
return SCHEME_HDFS;
}
Review Comment:
added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
Copilot commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2587894164
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1872,41 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ void testGetFileSystemWithTimeout() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations();
+
+LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+when(localFileSystemProvider.scheme()).thenReturn("file");
+when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
Review Comment:
Using `Mockito.any(Path.class)` and `Mockito.anyMap()` instead of the
imported `any()` matcher is inconsistent. For consistency with the changes on
lines 1462 and 1464 which use `any()`, these should also use `any()`:
```java
when(localFileSystemProvider.getFileSystem(any(Path.class), anyMap()))
```
```suggestion
when(localFileSystemProvider.getFileSystem(any(), any()))
```
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1872,41 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ void testGetFileSystemWithTimeout() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations();
+
+LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+when(localFileSystemProvider.scheme()).thenReturn("file");
+when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+.thenAnswer(
+invocation -> {
+ // Sleep 100s, however, the timeout is set to 6s by default in
+ // FilesetCatalogOperations, so
+ // it's expected to over within 10s
+ Thread.sleep(10); // Simulate delay
+ return new LocalFileSystem();
+});
+Map fileSystemProviderMapOriginal = new
HashMap<>();
+fileSystemProviderMapOriginal.put("file", localFileSystemProvider);
+FieldUtils.writeField(
+filesetCatalogOperations, "fileSystemProvidersMap",
fileSystemProviderMapOriginal, true);
+
+FieldUtils.writeField(
+filesetCatalogOperations, "propertiesMetadata",
FILESET_PROPERTIES_METADATA, true);
+
+// Test the following method should finish with 10s
+long now = System.currentTimeMillis();
+try {
+ filesetCatalogOperations.getFileSystem(new Path("file:///tmp"),
ImmutableMap.of());
+} catch (IOException e) {
+ Assertions.assertTrue(System.currentTimeMillis() - now <= 1);
+}
+ }
Review Comment:
The test should call `filesetCatalogOperations.close()` to clean up the
thread pool created in the test. Without this, the `fileSystemExecutor` threads
will remain running after the test completes, potentially causing resource
leaks or interference with other tests.
Add cleanup after the assertion:
```java
} finally {
filesetCatalogOperations.close();
}
```
```suggestion
try {
LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
when(localFileSystemProvider.scheme()).thenReturn("file");
when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
.thenAnswer(
invocation -> {
// Sleep 100s, however, the timeout is set to 6s by default
in
// FilesetCatalogOperations, so
// it's expected to over within 10s
Thread.sleep(10); // Simulate delay
return new LocalFileSystem();
});
Map fileSystemProviderMapOriginal = new
HashMap<>();
fileSystemProviderMapOriginal.put("file", localFileSystemProvider);
FieldUtils.writeField(
filesetCatalogOperations, "fileSystemProvidersMap",
fileSystemProviderMapOriginal, true);
FieldUtils.writeField(
filesetCatalogOperations, "propertiesMetadata",
FILESET_PROPERTIES_METADATA, true);
// Test the following method should finish with 10s
long now = System.currentTimeMillis();
try {
filesetCatalogOperations.getFileSystem(new Path("file:///tmp"),
ImmutableMap.of());
} catch (IOException e) {
Assertions.assertTrue(System.currentTimeMillis() - now <= 1);
}
} finally {
filesetCa
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2587859362
##
catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java:
##
@@ -1868,6 +1872,41 @@ public void testGetTargetLocation() throws IOException {
}
}
+ @Test
+ void testGetFileSystemWithTimeout() throws Exception {
+FieldUtils.writeField(
+GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations();
+
+LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+when(localFileSystemProvider.scheme()).thenReturn("file");
+when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+.thenAnswer(
+invocation -> {
+ // Sleep 100s, however, the timeout is set to 6s by default in
+ // FilesetCatalogOperations, so
+ // it's expected to over within 10s
+ Thread.sleep(10); // Simulate delay
Review Comment:
Don't use sleep, it is too flaky, use awaitability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282: URL: https://github.com/apache/gravitino/pull/9282#discussion_r2587856945 ## catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java: ## @@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends ManagedSchemaOperations @VisibleForTesting ScheduledThreadPoolExecutor scheduler; @VisibleForTesting Cache fileSystemCache; + private final ThreadPoolExecutor fileSystemExecutor = + new ThreadPoolExecutor( + Math.max(2, Runtime.getRuntime().availableProcessors() * 2), Review Comment: This will create too many threads for server if they're more than 32 cores. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2587853585
##
bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java:
##
@@ -80,6 +82,25 @@ public Map
getFileSystemCredentialConf(Credential[] credentials)
return result;
}
+ private Map additionalOSSConfig(Map configs)
{
Review Comment:
Move the private method to the end of public method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
jerryshao commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2587842170
##
bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java:
##
@@ -80,6 +82,25 @@ public Map
getFileSystemCredentialConf(Credential[] credentials)
return result;
}
+ private Map additionalOSSConfig(Map configs)
{
+Map additionalConfigs = Maps.newHashMap(configs);
+
+// Avoid multiple retries to speed up failure in test cases.
+// Use hard code instead of Constants.ESTABLISH_TIMEOUT_KEY to avoid
dependency on a specific
+// Hadoop version.
+if (!configs.containsKey("fs.oss.connection.establish.timeout")) {
+ additionalConfigs.put("fs.oss.connection.establish.timeout", "5000");
+}
+
+if (!configs.containsKey("fs.oss.attempts.maximum")) {
+ additionalConfigs.put("fs.oss.attempts.maximum", "2");
+}
Review Comment:
I think you can define all the conf keys and values in the hadoop-common or
catalog-common module, which is easy to maintain and update.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
yuqi1129 commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2584011512
##
bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java:
##
@@ -129,6 +131,33 @@ private void checkAndSetCredentialProvider(Map configs) {
}
}
+ private Map additionalS3Config(Map configs) {
+Map additionalConfigs = Maps.newHashMap(configs);
+
+// Avoid multiple retries to speed up failure in test cases.
+// Use hard code instead of Constants.MAX_ERROR_RETRIES to avoid
dependency on a specific Hadoop
+// version
+if (!configs.containsKey("fs.s3a.attempts.maximum")) {
+ additionalConfigs.put("fs.s3a.attempts.maximum", "2");
+}
+
+if (!configs.containsKey("fs.s3a.connection.establish.timeout")) {
+ additionalConfigs.put("fs.s3a.connection.establish.timeout", "5000");
+}
+
+if (!configs.containsKey("fs.s3a.retry.limit")) {
+ additionalConfigs.put("fs.s3a.retry.limit", "2");
+}
+
+if (!configs.containsKey("fs.s3a.retry.throttle.limit")) {
+ additionalConfigs.put("fs.s3a.retry.throttle.limit", "2");
+}
+
+// More tuning can be added here.
+
+return ImmutableMap.copyOf(additionalConfigs);
+ }
Review Comment:
I have tested it locally with a real s3 account. To verify it in test, we
need to trigger the test in CI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use `future` and solve hang problem [gravitino]
Copilot commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2583408603
##
bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java:
##
@@ -129,6 +131,33 @@ private void checkAndSetCredentialProvider(Map configs) {
}
}
+ private Map additionalS3Config(Map configs) {
+Map additionalConfigs = Maps.newHashMap(configs);
+
+// Avoid multiple retries to speed up failure in test cases.
+// Use hard code instead of Constants.MAX_ERROR_RETRIES to avoid
dependency on a specific Hadoop
+// version
+if (!configs.containsKey("fs.s3a.attempts.maximum")) {
+ additionalConfigs.put("fs.s3a.attempts.maximum", "2");
+}
+
+if (!configs.containsKey("fs.s3a.connection.establish.timeout")) {
+ additionalConfigs.put("fs.s3a.connection.establish.timeout", "5000");
+}
+
+if (!configs.containsKey("fs.s3a.retry.limit")) {
+ additionalConfigs.put("fs.s3a.retry.limit", "2");
+}
+
+if (!configs.containsKey("fs.s3a.retry.throttle.limit")) {
+ additionalConfigs.put("fs.s3a.retry.throttle.limit", "2");
+}
+
+// More tuning can be added here.
+
+return ImmutableMap.copyOf(additionalConfigs);
+ }
Review Comment:
The `additionalS3Config` method lacks test coverage. Consider adding tests
to verify:
1. Default timeout and retry values are applied when not present in the
config
2. User-provided values are preserved and not overridden
3. The returned map is immutable (ImmutableMap behavior)
##
bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java:
##
@@ -101,6 +103,19 @@ public Map
getFileSystemCredentialConf(Credential[] credentials)
return result;
}
+ private Map additionalAzureConfig(Map
configs) {
+Map additionalConfigs = Maps.newHashMap(configs);
+
+// Avoid multiple retries to speed up failure in test cases.
+if (!configs.containsKey("fs.azure.io.retry.max.retries")) {
+ additionalConfigs.put("fs.azure.io.retry.max.retries", "2");
+}
+
+// More tuning can be added here.
+
+return ImmutableMap.copyOf(additionalConfigs);
+ }
Review Comment:
The `additionalAzureConfig` method lacks test coverage. Consider adding
tests to verify:
1. Default retry values are applied when not present in the config
2. User-provided values are preserved and not overridden
3. The returned map is immutable (ImmutableMap behavior)
##
bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java:
##
@@ -62,6 +64,25 @@ public Map
getFileSystemCredentialConf(Credential[] credentials)
return result;
}
+ private Map additionalGCSConfig(Map configs)
{
+Map additionalConfigs = Maps.newHashMap(configs);
+
+// Avoid multiple retries to speed up failure in test cases.
+// Use hard code instead of
GoogleHadoopFileSystemBase.GCS_HTTP_CONNECT_TIMEOUT_KEY to avoid
+// dependency on a specific Hadoop version
+if (!configs.containsKey("fs.gs.http.connect-timeout")) {
+ additionalConfigs.put("fs.gs.http.connect-timeout", "5000");
+}
+
+if (!configs.containsKey("fs.gs.http.max.retry")) {
+ additionalConfigs.put("fs.gs.http.max.retry", "2");
+}
+
+// More tuning can be added here.
+
+return ImmutableMap.copyOf(additionalConfigs);
+ }
Review Comment:
The `additionalGCSConfig` method lacks test coverage. Consider adding tests
to verify:
1. Default timeout and retry values are applied when not present in the
config
2. User-provided values are preserved and not overridden
3. The returned map is immutable (ImmutableMap behavior)
##
bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java:
##
@@ -80,6 +82,25 @@ public Map
getFileSystemCredentialConf(Credential[] credentials)
return result;
}
+ private Map additionalOSSConfig(Map configs)
{
+Map additionalConfigs = Maps.newHashMap(configs);
+
+// Avoid multiple retries to speed up failure in test cases.
+// Use hard code instead of Constants.ESTABLISH_TIMEOUT_KEY to avoid
dependency on a specific
+// Hadoop version.
+if (!configs.containsKey("fs.oss.connection.establish.timeout")) {
+ additionalConfigs.put("fs.oss.connection.establish.timeout", "5000");
+}
+
+if (!configs.containsKey("fs.oss.attempts.maximum")) {
+ additionalConfigs.put("fs.oss.attempts.maximum", "2");
+}
+
+// More tuning can be added here.
+
+return ImmutableMap.copyOf(additionalConfigs);
+ }
Review Comment:
The `additionalOSSConfig` method lacks test coverage. Consider adding tests
to verify:
1. Default timeout and retry values are applied when not present in the
config
2. User-provided values are preserved and not overridden
3. The returned map is immutable (ImmutableMap behavior)
##
catalogs/catalog-fileset/src/main/java/org/apache/g
