This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7fc4aa3f5ff [SPARK-45533][CORE] Use j.l.r.Cleaner instead of finalize for RocksDBIterator/LevelDBIterator 7fc4aa3f5ff is described below commit 7fc4aa3f5ff98f871725f7cab027067b900e6706 Author: zhaomin <zhaomin1...@163.com> AuthorDate: Tue Nov 21 22:48:03 2023 +0800 [SPARK-45533][CORE] Use j.l.r.Cleaner instead of finalize for RocksDBIterator/LevelDBIterator ### What changes were proposed in this pull request? use java.lang.ref.Cleaner instead of finalize() for RocksDBIterator ### Why are the changes needed? The finalize() method has been marked as deprecated since Java 9 and will be removed in the future, java.lang.ref.Cleaner is the more recommended solution. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass actions and new tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43502 from zhaomin1423/45533_2. Lead-authored-by: zhaomin <zhaomin1...@163.com> Co-authored-by: Min Zhao <zhaomin1...@163.com> Co-authored-by: yanbei.zm <yanbei...@antgroup.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- common/kvstore/pom.xml | 9 ++- .../org/apache/spark/util/kvstore/LevelDB.java | 10 ++- .../apache/spark/util/kvstore/LevelDBIterator.java | 77 ++++++++++++++++++---- .../org/apache/spark/util/kvstore/RocksDB.java | 9 ++- .../apache/spark/util/kvstore/RocksDBIterator.java | 65 +++++++++++++++--- .../apache/spark/util/kvstore/LevelDBSuite.java | 39 +++++++++++ .../apache/spark/util/kvstore/RocksDBSuite.java | 39 +++++++++++ 7 files changed, 216 insertions(+), 32 deletions(-) diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index 54b7f401cc4..a9b5a463471 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -60,6 +60,10 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> <dependency> <groupId>commons-io</groupId> @@ -82,11 +86,6 @@ <artifactId>log4j-1.2-api</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j2-impl</artifactId> diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index b50906e2cba..13a9d89f470 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -34,6 +34,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.Options; import org.iq80.leveldb.WriteBatch; @@ -326,7 +327,7 @@ public class LevelDB implements KVStore { * Closes the given iterator if the DB is still open. Trying to close a JNI LevelDB handle * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. */ - void closeIterator(LevelDBIterator<?> it) throws IOException { + void closeIterator(DBIterator it) throws IOException { notifyIteratorClosed(it); synchronized (this._db) { DB _db = this._db.get(); @@ -340,8 +341,11 @@ public class LevelDB implements KVStore { * Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify * iterator is closed. */ - void notifyIteratorClosed(LevelDBIterator<?> it) { - iteratorTracker.removeIf(ref -> it.equals(ref.get())); + void notifyIteratorClosed(DBIterator dbIterator) { + iteratorTracker.removeIf(ref -> { + LevelDBIterator<?> it = ref.get(); + return it != null && dbIterator.equals(it.internalIterator()); + }); } /** Returns metadata about indices for the given type. */ diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java index 35d0c6065fb..b830e6afc61 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java @@ -18,18 +18,24 @@ package org.apache.spark.util.kvstore; import java.io.IOException; +import java.lang.ref.Cleaner; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.iq80.leveldb.DBIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class LevelDBIterator<T> implements KVStoreIterator<T> { + private static final Cleaner CLEANER = Cleaner.create(); + private final LevelDB db; private final boolean ascending; private final DBIterator it; @@ -40,6 +46,9 @@ class LevelDBIterator<T> implements KVStoreIterator<T> { private final byte[] end; private final long max; + private final ResourceCleaner resourceCleaner; + private final Cleaner.Cleanable cleanable; + private boolean checkedNext; private byte[] next; private boolean closed; @@ -53,6 +62,8 @@ class LevelDBIterator<T> implements KVStoreIterator<T> { this.ti = db.getTypeInfo(type); this.index = ti.index(params.index); this.max = params.max; + this.resourceCleaner = new ResourceCleaner(it, db); + this.cleanable = CLEANER.register(this, this.resourceCleaner); Preconditions.checkArgument(!index.isChild() || params.parent != null, "Cannot iterate over child index %s without parent value.", params.index); @@ -182,23 +193,33 @@ class LevelDBIterator<T> implements KVStoreIterator<T> { @Override public synchronized void close() throws IOException { - db.notifyIteratorClosed(this); + db.notifyIteratorClosed(it); if (!closed) { - it.close(); - closed = true; - next = null; + try { + it.close(); + } finally { + closed = true; + next = null; + cancelResourceClean(); + } } } /** - * Because it's tricky to expose closeable iterators through many internal APIs, especially - * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by - * the iterator will eventually be released. + * Prevent ResourceCleaner from trying to release resources after close. */ - @SuppressWarnings("deprecation") - @Override - protected void finalize() throws Throwable { - db.closeIterator(this); + private void cancelResourceClean() { + this.resourceCleaner.setStartedToFalse(); + this.cleanable.clean(); + } + + DBIterator internalIterator() { + return it; + } + + @VisibleForTesting + ResourceCleaner getResourceCleaner() { + return resourceCleaner; } private byte[] loadNext() { @@ -280,4 +301,38 @@ class LevelDBIterator<T> implements KVStoreIterator<T> { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ResourceCleaner.class); + + private final DBIterator dbIterator; + + private final LevelDB levelDB; + + private final AtomicBoolean started = new AtomicBoolean(true); + + ResourceCleaner(DBIterator dbIterator, LevelDB levelDB) { + this.dbIterator = dbIterator; + this.levelDB = levelDB; + } + + @Override + public void run() { + if (started.compareAndSet(true, false)) { + try { + levelDB.closeIterator(dbIterator); + } catch (IOException e) { + LOG.warn("Failed to close iterator", e); + } + } + } + + void setStartedToFalse() { + started.set(false); + } + + @VisibleForTesting + boolean isCompleted() { + return !started.get(); + } + } } diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java index d328e5c79d3..dc7ad0be5c0 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java @@ -359,7 +359,7 @@ public class RocksDB implements KVStore { * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. */ - void closeIterator(RocksDBIterator<?> it) throws IOException { + void closeIterator(RocksIterator it) { notifyIteratorClosed(it); synchronized (this._db) { org.rocksdb.RocksDB _db = this._db.get(); @@ -373,8 +373,11 @@ public class RocksDB implements KVStore { * Remove iterator from iterator tracker. `RocksDBIterator` calls it to notify * iterator is closed. */ - void notifyIteratorClosed(RocksDBIterator<?> it) { - iteratorTracker.removeIf(ref -> it.equals(ref.get())); + void notifyIteratorClosed(RocksIterator rocksIterator) { + iteratorTracker.removeIf(ref -> { + RocksDBIterator<?> rocksDBIterator = ref.get(); + return rocksDBIterator != null && rocksIterator.equals(rocksDBIterator.internalIterator()); + }); } /** Returns metadata about indices for the given type. */ diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java index ba7b8d8b813..a98b0482e35 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java @@ -18,7 +18,9 @@ package org.apache.spark.util.kvstore; import java.io.IOException; +import java.lang.ref.Cleaner; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -27,6 +29,7 @@ import org.rocksdb.RocksIterator; class RocksDBIterator<T> implements KVStoreIterator<T> { + private static final Cleaner CLEANER = Cleaner.create(); private final RocksDB db; private final boolean ascending; private final RocksIterator it; @@ -36,6 +39,8 @@ class RocksDBIterator<T> implements KVStoreIterator<T> { private final byte[] indexKeyPrefix; private final byte[] end; private final long max; + private final Cleaner.Cleanable cleanable; + private final RocksDBIterator.ResourceCleaner resourceCleaner; private boolean checkedNext; private byte[] next; @@ -50,6 +55,8 @@ class RocksDBIterator<T> implements KVStoreIterator<T> { this.ti = db.getTypeInfo(type); this.index = ti.index(params.index); this.max = params.max; + this.resourceCleaner = new RocksDBIterator.ResourceCleaner(it, db); + this.cleanable = CLEANER.register(this, resourceCleaner); Preconditions.checkArgument(!index.isChild() || params.parent != null, "Cannot iterate over child index %s without parent value.", params.index); @@ -176,22 +183,33 @@ class RocksDBIterator<T> implements KVStoreIterator<T> { @Override public synchronized void close() throws IOException { - db.notifyIteratorClosed(this); + db.notifyIteratorClosed(it); if (!closed) { - it.close(); - closed = true; - next = null; + try { + it.close(); + } finally { + closed = true; + next = null; + cancelResourceClean(); + } } } /** - * Because it's tricky to expose closeable iterators through many internal APIs, especially - * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by - * the iterator will eventually be released. + * Prevent ResourceCleaner from actually releasing resources after close it. */ - @Override - protected void finalize() throws Throwable { - db.closeIterator(this); + private void cancelResourceClean() { + this.resourceCleaner.setStartedToFalse(); + this.cleanable.clean(); + } + + @VisibleForTesting + ResourceCleaner getResourceCleaner() { + return resourceCleaner; + } + + RocksIterator internalIterator() { + return it; } private byte[] loadNext() { @@ -272,4 +290,31 @@ class RocksDBIterator<T> implements KVStoreIterator<T> { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + + private final RocksIterator rocksIterator; + private final RocksDB rocksDB; + private final AtomicBoolean started = new AtomicBoolean(true); + + ResourceCleaner(RocksIterator rocksIterator, RocksDB rocksDB) { + this.rocksIterator = rocksIterator; + this.rocksDB = rocksDB; + } + + @Override + public void run() { + if (started.compareAndSet(true, false)) { + rocksDB.closeIterator(rocksIterator); + } + } + + void setStartedToFalse() { + started.set(false); + } + + @VisibleForTesting + boolean isCompleted() { + return !started.get(); + } + } } diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java index ec1c8101737..c22aea821af 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java @@ -18,6 +18,8 @@ package org.apache.spark.util.kvstore; import java.io.File; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -33,6 +35,7 @@ import org.iq80.leveldb.DBIterator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; + import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assumptions.assumeFalse; @@ -383,6 +386,42 @@ public class LevelDBSuite { assertFalse(iter.skip(1)); } + @Test + public void testResourceCleaner() throws Exception { + File dbPathForCleanerTest = File.createTempFile( + "test_db_cleaner.", ".rdb"); + dbPathForCleanerTest.delete(); + + LevelDB dbForCleanerTest = new LevelDB(dbPathForCleanerTest); + try { + for (int i = 0; i < 8192; i++) { + dbForCleanerTest.write(createCustomType1(i)); + } + LevelDBIterator<CustomType1> levelDBIterator = + (LevelDBIterator<CustomType1>) dbForCleanerTest.view(CustomType1.class).iterator(); + Reference<LevelDBIterator<?>> reference = new WeakReference<>(levelDBIterator); + assertNotNull(reference); + LevelDBIterator.ResourceCleaner resourceCleaner = levelDBIterator.getResourceCleaner(); + assertFalse(resourceCleaner.isCompleted()); + // Manually set levelDBIterator to null, to be GC. + levelDBIterator = null; + // 100 times gc, the levelDBIterator should be GCed. + int count = 0; + while (count < 100 && !reference.refersTo(null)) { + System.gc(); + count++; + Thread.sleep(100); + } + // check rocksDBIterator should be GCed + assertTrue(reference.refersTo(null)); + // Verify that the Cleaner will be executed after a period of time, isAllocated is true. + assertTrue(resourceCleaner.isCompleted()); + } finally { + dbForCleanerTest.close(); + FileUtils.deleteQuietly(dbPathForCleanerTest); + } + } + private CustomType1 createCustomType1(int i) { CustomType1 t = new CustomType1(); t.key = "key" + i; diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java index b61a7afcd07..61f18a9a26d 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java @@ -18,6 +18,8 @@ package org.apache.spark.util.kvstore; import java.io.File; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -381,6 +383,43 @@ public class RocksDBSuite { assertFalse(iter.skip(1)); } + @Test + public void testResourceCleaner() throws Exception { + File dbPathForCleanerTest = File.createTempFile( + "test_db_cleaner.", ".rdb"); + dbPathForCleanerTest.delete(); + + RocksDB dbForCleanerTest = new RocksDB(dbPathForCleanerTest); + try { + for (int i = 0; i < 8192; i++) { + dbForCleanerTest.write(createCustomType1(i)); + } + RocksDBIterator<CustomType1> rocksDBIterator = + (RocksDBIterator<CustomType1>) dbForCleanerTest.view(CustomType1.class).iterator(); + Reference<RocksDBIterator<?>> reference = new WeakReference<>(rocksDBIterator); + assertNotNull(reference); + RocksDBIterator.ResourceCleaner resourceCleaner = rocksDBIterator.getResourceCleaner(); + assertFalse(resourceCleaner.isCompleted()); + // Manually set rocksDBIterator to null, to be GC. + rocksDBIterator = null; + // 100 times gc, the rocksDBIterator should be GCed. + int count = 0; + while (count < 100 && !reference.refersTo(null)) { + System.gc(); + count++; + Thread.sleep(100); + } + // check rocksDBIterator should be GCed + assertTrue(reference.refersTo(null)); + // Verify that the Cleaner will be executed after a period of time, + // and status will become false. + assertTrue(resourceCleaner.isCompleted()); + } finally { + dbForCleanerTest.close(); + FileUtils.deleteQuietly(dbPathForCleanerTest); + } + } + private CustomType1 createCustomType1(int i) { CustomType1 t = new CustomType1(); t.key = "key" + i; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org