This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new d22bdc59843 [MINOR] Avoid resource leaks (#10345) d22bdc59843 is described below commit d22bdc5984387c0664fa43854f19b3988f5149fc Author: Tim Brown <t...@onehouse.ai> AuthorDate: Wed Jan 10 17:06:00 2024 -0800 [MINOR] Avoid resource leaks (#10345) --- .../main/java/org/apache/hudi/metrics/Metrics.java | 35 +++++++++++++++------- .../hudi/testutils/TestHoodieMetadataBase.java | 2 +- .../hudi/common/table/log/HoodieLogFileReader.java | 1 + .../common/table/log/HoodieLogFormatWriter.java | 1 + .../common/util/collection/LazyFileIterable.java | 9 +++++- .../hudi/internal/schema/utils/SerDeHelper.java | 6 ++-- .../io/storage/HoodieBootstrapRecordIterator.java | 3 +- .../hudi/common/testutils/SchemaTestUtil.java | 5 ++-- .../hudi/hadoop/TestHoodieHFileInputFormat.java | 1 + .../hudi/hadoop/TestHoodieParquetInputFormat.java | 2 ++ .../realtime/TestHoodieRealtimeRecordReader.java | 3 ++ 11 files changed, 49 insertions(+), 19 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java index 47ee23bcc2f..31b0d19da01 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -50,6 +50,7 @@ public class Metrics { private final List<MetricsReporter> reporters; private final String commonMetricPrefix; private boolean initialized = false; + private transient Thread shutdownThread = null; public Metrics(HoodieWriteConfig metricConfig) { registry = new MetricRegistry(); @@ -65,7 +66,8 @@ public class Metrics { } reporters.forEach(MetricsReporter::start); - Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); + shutdownThread = new Thread(() -> shutdown(true)); + Runtime.getRuntime().addShutdownHook(shutdownThread); this.initialized = true; } @@ -112,16 +114,27 @@ public class Metrics { return reporterList; } - public synchronized void shutdown() { - try { - registerHoodieCommonMetrics(); - reporters.forEach(MetricsReporter::report); - LOG.info("Stopping the metrics reporter..."); - reporters.forEach(MetricsReporter::stop); - } catch (Exception e) { - LOG.warn("Error while closing reporter", e); - } finally { - initialized = false; + public void shutdown() { + shutdown(false); + } + + private synchronized void shutdown(boolean fromShutdownHook) { + if (!fromShutdownHook) { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } else { + LOG.warn("Shutting down the metrics reporter from shutdown hook."); + } + if (initialized) { + try { + registerHoodieCommonMetrics(); + reporters.forEach(MetricsReporter::report); + LOG.info("Stopping the metrics reporter..."); + reporters.forEach(MetricsReporter::stop); + } catch (Exception e) { + LOG.warn("Error while closing reporter", e); + } finally { + initialized = false; + } } } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java index 7f9e4712cff..336309deb23 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java @@ -296,7 +296,7 @@ public class TestHoodieMetadataBase extends HoodieJavaClientTestHarness { .withAutoClean(false).retainCommits(1).retainFileVersions(1) .build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build()) - .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") + .withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table") .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() .withEnableBackupForRemoteFileSystemView(false).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 6759650af78..764c919d2e1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -142,6 +142,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private void addShutDownHook() { shutdownThread = new Thread(() -> { try { + LOG.warn("Failed to properly close HoodieLogFileReader in application."); close(); } catch (Exception e) { LOG.warn("unable to close input stream for log file " + logFile, e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index f190b883c30..4e5082d4541 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -276,6 +276,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { shutdownThread = new Thread() { public void run() { try { + LOG.warn("running logformatwriter hook"); if (output != null) { close(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java index 8e2210d61ee..799aa3d4d56 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java @@ -21,6 +21,9 @@ package org.apache.hudi.common.util.collection; import org.apache.hudi.common.util.BufferedRandomAccessFile; import org.apache.hudi.exception.HoodieException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.Iterator; import java.util.Map; @@ -32,6 +35,7 @@ import java.util.stream.Collectors; * the latest value for a key spilled to disk and returns the result. */ public class LazyFileIterable<T, R> implements Iterable<R> { + private static final Logger LOG = LoggerFactory.getLogger(LazyFileIterable.class); // Used to access the value written at a specific position in the file private final String filePath; @@ -128,7 +132,10 @@ public class LazyFileIterable<T, R> implements Iterable<R> { } private void addShutdownHook() { - shutdownThread = new Thread(this::closeHandle); + shutdownThread = new Thread(() -> { + LOG.warn("Failed to properly close LazyFileIterable in application."); + this.closeHandle(); + }); Runtime.getRuntime().addShutdownHook(shutdownThread); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java index f47d7f8da51..7891fc4582c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java @@ -18,6 +18,7 @@ package org.apache.hudi.internal.schema.utils; +import org.apache.hudi.common.util.JsonUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -28,7 +29,6 @@ import org.apache.hudi.internal.schema.Types; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.StringWriter; @@ -295,7 +295,7 @@ public class SerDeHelper { return Option.empty(); } try { - return Option.of(fromJson((new ObjectMapper(new JsonFactory())).readValue(json, JsonNode.class))); + return Option.of(fromJson(JsonUtils.getObjectMapper().readTree(json))); } catch (IOException e) { throw new RuntimeException(e); } @@ -311,7 +311,7 @@ public class SerDeHelper { public static TreeMap<Long, InternalSchema> parseSchemas(String json) { TreeMap<Long, InternalSchema> result = new TreeMap<>(); try { - JsonNode jsonNode = (new ObjectMapper(new JsonFactory())).readValue(json, JsonNode.class); + JsonNode jsonNode = JsonUtils.getObjectMapper().readTree(json); if (!jsonNode.has(SCHEMAS)) { throw new IllegalArgumentException(String.format("cannot parser schemas from current json string, missing key name: %s", SCHEMAS)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java index 43f2d1ad1ad..6fa398a8225 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java @@ -50,7 +50,8 @@ public abstract class HoodieBootstrapRecordIterator<T> implements ClosableIterat @Override public void close() { - + skeletonIterator.close(); + dataFileIterator.close(); } @Override diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java index a96d53a273f..9ee16174973 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java @@ -37,6 +37,7 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.util.Utf8; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -271,8 +272,8 @@ public final class SchemaTestUtil { } public static Schema getSchemaFromResource(Class<?> clazz, String name, boolean withHoodieMetadata) { - try { - Schema schema = new Schema.Parser().parse(clazz.getResourceAsStream(name)); + try (InputStream schemaInputStream = clazz.getResourceAsStream(name)) { + Schema schema = new Schema.Parser().parse(schemaInputStream); return withHoodieMetadata ? HoodieAvroUtils.addMetadataFields(schema) : schema; } catch (IOException e) { throw new RuntimeException(String.format("Failed to get schema from resource `%s` for class `%s`", name, clazz.getName())); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java index a5260766bf1..ec6e3ee94dd 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java @@ -518,6 +518,7 @@ public class TestHoodieHFileInputFormat { } totalCount++; } + recordReader.close(); } assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg); assertEquals(totalExpected, totalCount, msg); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 0b0a615c263..d71055079c2 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -766,6 +766,7 @@ public class TestHoodieParquetInputFormat { } totalCount++; } + recordReader.close(); } assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg); assertEquals(totalExpected, totalCount, msg); @@ -821,6 +822,7 @@ public class TestHoodieParquetInputFormat { // test date assertEquals(LocalDate.ofEpochDay(testDate).toString(), String.valueOf(writable.get()[2])); } + recordReader.close(); } } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 15d983ee48b..350728b9d64 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -510,6 +510,7 @@ public class TestHoodieRealtimeRecordReader { } reader.close(); } + recordReader.close(); } @ParameterizedTest @@ -593,6 +594,7 @@ public class TestHoodieRealtimeRecordReader { while (recordReader.next(key, value)) { // keep reading } + recordReader.close(); reader.close(); } @@ -650,6 +652,7 @@ public class TestHoodieRealtimeRecordReader { while (recordReader.next(key, value)) { // keep reading } + recordReader.close(); reader.close(); }