This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d22bdc59843 [MINOR] Avoid resource leaks  (#10345)
d22bdc59843 is described below

commit d22bdc5984387c0664fa43854f19b3988f5149fc
Author: Tim Brown <t...@onehouse.ai>
AuthorDate: Wed Jan 10 17:06:00 2024 -0800

    [MINOR] Avoid resource leaks  (#10345)
---
 .../main/java/org/apache/hudi/metrics/Metrics.java | 35 +++++++++++++++-------
 .../hudi/testutils/TestHoodieMetadataBase.java     |  2 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |  1 +
 .../common/table/log/HoodieLogFormatWriter.java    |  1 +
 .../common/util/collection/LazyFileIterable.java   |  9 +++++-
 .../hudi/internal/schema/utils/SerDeHelper.java    |  6 ++--
 .../io/storage/HoodieBootstrapRecordIterator.java  |  3 +-
 .../hudi/common/testutils/SchemaTestUtil.java      |  5 ++--
 .../hudi/hadoop/TestHoodieHFileInputFormat.java    |  1 +
 .../hudi/hadoop/TestHoodieParquetInputFormat.java  |  2 ++
 .../realtime/TestHoodieRealtimeRecordReader.java   |  3 ++
 11 files changed, 49 insertions(+), 19 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index 47ee23bcc2f..31b0d19da01 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -50,6 +50,7 @@ public class Metrics {
   private final List<MetricsReporter> reporters;
   private final String commonMetricPrefix;
   private boolean initialized = false;
+  private transient Thread shutdownThread = null;
 
   public Metrics(HoodieWriteConfig metricConfig) {
     registry = new MetricRegistry();
@@ -65,7 +66,8 @@ public class Metrics {
     }
     reporters.forEach(MetricsReporter::start);
 
-    Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
+    shutdownThread = new Thread(() -> shutdown(true));
+    Runtime.getRuntime().addShutdownHook(shutdownThread);
     this.initialized = true;
   }
 
@@ -112,16 +114,27 @@ public class Metrics {
     return reporterList;
   }
 
-  public synchronized void shutdown() {
-    try {
-      registerHoodieCommonMetrics();
-      reporters.forEach(MetricsReporter::report);
-      LOG.info("Stopping the metrics reporter...");
-      reporters.forEach(MetricsReporter::stop);
-    } catch (Exception e) {
-      LOG.warn("Error while closing reporter", e);
-    } finally {
-      initialized = false;
+  public void shutdown() {
+    shutdown(false);
+  }
+
+  private synchronized void shutdown(boolean fromShutdownHook) {
+    if (!fromShutdownHook) {
+      Runtime.getRuntime().removeShutdownHook(shutdownThread);
+    } else {
+      LOG.warn("Shutting down the metrics reporter from shutdown hook.");
+    }
+    if (initialized) {
+      try {
+        registerHoodieCommonMetrics();
+        reporters.forEach(MetricsReporter::report);
+        LOG.info("Stopping the metrics reporter...");
+        reporters.forEach(MetricsReporter::stop);
+      } catch (Exception e) {
+        LOG.warn("Error while closing reporter", e);
+      } finally {
+        initialized = false;
+      }
     }
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
index 7f9e4712cff..336309deb23 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
@@ -296,7 +296,7 @@ public class TestHoodieMetadataBase extends 
HoodieJavaClientTestHarness {
             .withAutoClean(false).retainCommits(1).retainFileVersions(1)
             .build())
         
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 
1024 * 1024).build())
-        .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
+        .withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table")
         .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
             .withEnableBackupForRemoteFileSystemView(false).build())
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 6759650af78..764c919d2e1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -142,6 +142,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private void addShutDownHook() {
     shutdownThread = new Thread(() -> {
       try {
+        LOG.warn("Failed to properly close HoodieLogFileReader in 
application.");
         close();
       } catch (Exception e) {
         LOG.warn("unable to close input stream for log file " + logFile, e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index f190b883c30..4e5082d4541 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -276,6 +276,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
     shutdownThread = new Thread() {
       public void run() {
         try {
+          LOG.warn("running logformatwriter hook");
           if (output != null) {
             close();
           }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
index 8e2210d61ee..799aa3d4d56 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
@@ -21,6 +21,9 @@ package org.apache.hudi.common.util.collection;
 import org.apache.hudi.common.util.BufferedRandomAccessFile;
 import org.apache.hudi.exception.HoodieException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
@@ -32,6 +35,7 @@ import java.util.stream.Collectors;
  * the latest value for a key spilled to disk and returns the result.
  */
 public class LazyFileIterable<T, R> implements Iterable<R> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LazyFileIterable.class);
 
   // Used to access the value written at a specific position in the file
   private final String filePath;
@@ -128,7 +132,10 @@ public class LazyFileIterable<T, R> implements Iterable<R> 
{
     }
 
     private void addShutdownHook() {
-      shutdownThread = new Thread(this::closeHandle);
+      shutdownThread = new Thread(() -> {
+        LOG.warn("Failed to properly close LazyFileIterable in application.");
+        this.closeHandle();
+      });
       Runtime.getRuntime().addShutdownHook(shutdownThread);
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
index f47d7f8da51..7891fc4582c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.internal.schema.utils;
 
+import org.apache.hudi.common.util.JsonUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -28,7 +29,6 @@ import org.apache.hudi.internal.schema.Types;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.io.IOException;
 import java.io.StringWriter;
@@ -295,7 +295,7 @@ public class SerDeHelper {
       return Option.empty();
     }
     try {
-      return Option.of(fromJson((new ObjectMapper(new 
JsonFactory())).readValue(json, JsonNode.class)));
+      return Option.of(fromJson(JsonUtils.getObjectMapper().readTree(json)));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -311,7 +311,7 @@ public class SerDeHelper {
   public static TreeMap<Long, InternalSchema> parseSchemas(String json) {
     TreeMap<Long, InternalSchema> result = new TreeMap<>();
     try {
-      JsonNode jsonNode = (new ObjectMapper(new 
JsonFactory())).readValue(json, JsonNode.class);
+      JsonNode jsonNode = JsonUtils.getObjectMapper().readTree(json);
       if (!jsonNode.has(SCHEMAS)) {
         throw new IllegalArgumentException(String.format("cannot parser 
schemas from current json string, missing key name: %s", SCHEMAS));
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java
index 43f2d1ad1ad..6fa398a8225 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java
@@ -50,7 +50,8 @@ public abstract class HoodieBootstrapRecordIterator<T> 
implements ClosableIterat
 
   @Override
   public void close() {
-
+    skeletonIterator.close();
+    dataFileIterator.close();
   }
 
   @Override
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index a96d53a273f..9ee16174973 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -37,6 +37,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.util.Utf8;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
@@ -271,8 +272,8 @@ public final class SchemaTestUtil {
   }
 
   public static Schema getSchemaFromResource(Class<?> clazz, String name, 
boolean withHoodieMetadata) {
-    try {
-      Schema schema = new 
Schema.Parser().parse(clazz.getResourceAsStream(name));
+    try (InputStream schemaInputStream = clazz.getResourceAsStream(name)) {
+      Schema schema = new Schema.Parser().parse(schemaInputStream);
       return withHoodieMetadata ? HoodieAvroUtils.addMetadataFields(schema) : 
schema;
     } catch (IOException e) {
       throw new RuntimeException(String.format("Failed to get schema from 
resource `%s` for class `%s`", name, clazz.getName()));
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
index a5260766bf1..ec6e3ee94dd 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
@@ -518,6 +518,7 @@ public class TestHoodieHFileInputFormat {
         }
         totalCount++;
       }
+      recordReader.close();
     }
     assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
     assertEquals(totalExpected, totalCount, msg);
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 0b0a615c263..d71055079c2 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -766,6 +766,7 @@ public class TestHoodieParquetInputFormat {
         }
         totalCount++;
       }
+      recordReader.close();
     }
     assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
     assertEquals(totalExpected, totalCount, msg);
@@ -821,6 +822,7 @@ public class TestHoodieParquetInputFormat {
         // test date
         assertEquals(LocalDate.ofEpochDay(testDate).toString(), 
String.valueOf(writable.get()[2]));
       }
+      recordReader.close();
     }
   }
 }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 15d983ee48b..350728b9d64 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -510,6 +510,7 @@ public class TestHoodieRealtimeRecordReader {
       }
       reader.close();
     }
+    recordReader.close();
   }
 
   @ParameterizedTest
@@ -593,6 +594,7 @@ public class TestHoodieRealtimeRecordReader {
     while (recordReader.next(key, value)) {
       // keep reading
     }
+    recordReader.close();
     reader.close();
   }
 
@@ -650,6 +652,7 @@ public class TestHoodieRealtimeRecordReader {
     while (recordReader.next(key, value)) {
       // keep reading
     }
+    recordReader.close();
     reader.close();
   }
 

Reply via email to