This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fc4aa3f5ff [SPARK-45533][CORE] Use j.l.r.Cleaner instead of finalize 
for RocksDBIterator/LevelDBIterator
7fc4aa3f5ff is described below

commit 7fc4aa3f5ff98f871725f7cab027067b900e6706
Author: zhaomin <zhaomin1...@163.com>
AuthorDate: Tue Nov 21 22:48:03 2023 +0800

    [SPARK-45533][CORE] Use j.l.r.Cleaner instead of finalize for 
RocksDBIterator/LevelDBIterator
    
    ### What changes were proposed in this pull request?
    
    use java.lang.ref.Cleaner instead of finalize() for RocksDBIterator
    ### Why are the changes needed?
    
    The finalize() method has been marked as deprecated since Java 9 and will 
be removed in the future, java.lang.ref.Cleaner is the more recommended 
solution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass actions and new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43502 from zhaomin1423/45533_2.
    
    Lead-authored-by: zhaomin <zhaomin1...@163.com>
    Co-authored-by: Min Zhao <zhaomin1...@163.com>
    Co-authored-by: yanbei.zm <yanbei...@antgroup.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 common/kvstore/pom.xml                             |  9 ++-
 .../org/apache/spark/util/kvstore/LevelDB.java     | 10 ++-
 .../apache/spark/util/kvstore/LevelDBIterator.java | 77 ++++++++++++++++++----
 .../org/apache/spark/util/kvstore/RocksDB.java     |  9 ++-
 .../apache/spark/util/kvstore/RocksDBIterator.java | 65 +++++++++++++++---
 .../apache/spark/util/kvstore/LevelDBSuite.java    | 39 +++++++++++
 .../apache/spark/util/kvstore/RocksDBSuite.java    | 39 +++++++++++
 7 files changed, 216 insertions(+), 32 deletions(-)

diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml
index 54b7f401cc4..a9b5a463471 100644
--- a/common/kvstore/pom.xml
+++ b/common/kvstore/pom.xml
@@ -60,6 +60,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>commons-io</groupId>
@@ -82,11 +86,6 @@
       <artifactId>log4j-1.2-api</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j2-impl</artifactId>
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index b50906e2cba..13a9d89f470 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -34,6 +34,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
 import org.iq80.leveldb.Options;
 import org.iq80.leveldb.WriteBatch;
 
@@ -326,7 +327,7 @@ public class LevelDB implements KVStore {
    * Closes the given iterator if the DB is still open. Trying to close a JNI 
LevelDB handle
    * with a closed DB can cause JVM crashes, so this ensures that situation 
does not happen.
    */
-  void closeIterator(LevelDBIterator<?> it) throws IOException {
+  void closeIterator(DBIterator it) throws IOException {
     notifyIteratorClosed(it);
     synchronized (this._db) {
       DB _db = this._db.get();
@@ -340,8 +341,11 @@ public class LevelDB implements KVStore {
    * Remove iterator from iterator tracker. `LevelDBIterator` calls it to 
notify
    * iterator is closed.
    */
-  void notifyIteratorClosed(LevelDBIterator<?> it) {
-    iteratorTracker.removeIf(ref -> it.equals(ref.get()));
+  void notifyIteratorClosed(DBIterator dbIterator) {
+    iteratorTracker.removeIf(ref -> {
+      LevelDBIterator<?> it = ref.get();
+      return it != null && dbIterator.equals(it.internalIterator());
+    });
   }
 
   /** Returns metadata about indices for the given type. */
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
index 35d0c6065fb..b830e6afc61 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
@@ -18,18 +18,24 @@
 package org.apache.spark.util.kvstore;
 
 import java.io.IOException;
+import java.lang.ref.Cleaner;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import org.iq80.leveldb.DBIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class LevelDBIterator<T> implements KVStoreIterator<T> {
 
+  private static final Cleaner CLEANER = Cleaner.create();
+
   private final LevelDB db;
   private final boolean ascending;
   private final DBIterator it;
@@ -40,6 +46,9 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
   private final byte[] end;
   private final long max;
 
+  private final ResourceCleaner resourceCleaner;
+  private final Cleaner.Cleanable cleanable;
+
   private boolean checkedNext;
   private byte[] next;
   private boolean closed;
@@ -53,6 +62,8 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
     this.ti = db.getTypeInfo(type);
     this.index = ti.index(params.index);
     this.max = params.max;
+    this.resourceCleaner = new ResourceCleaner(it, db);
+    this.cleanable = CLEANER.register(this, this.resourceCleaner);
 
     Preconditions.checkArgument(!index.isChild() || params.parent != null,
       "Cannot iterate over child index %s without parent value.", 
params.index);
@@ -182,23 +193,33 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
 
   @Override
   public synchronized void close() throws IOException {
-    db.notifyIteratorClosed(this);
+    db.notifyIteratorClosed(it);
     if (!closed) {
-      it.close();
-      closed = true;
-      next = null;
+      try {
+        it.close();
+      } finally {
+        closed = true;
+        next = null;
+        cancelResourceClean();
+      }
     }
   }
 
   /**
-   * Because it's tricky to expose closeable iterators through many internal 
APIs, especially
-   * when Scala wrappers are used, this makes sure that, hopefully, the JNI 
resources held by
-   * the iterator will eventually be released.
+   * Prevent ResourceCleaner from trying to release resources after close.
    */
-  @SuppressWarnings("deprecation")
-  @Override
-  protected void finalize() throws Throwable {
-    db.closeIterator(this);
+  private void cancelResourceClean() {
+    this.resourceCleaner.setStartedToFalse();
+    this.cleanable.clean();
+  }
+
+  DBIterator internalIterator() {
+    return it;
+  }
+
+  @VisibleForTesting
+  ResourceCleaner getResourceCleaner() {
+    return resourceCleaner;
   }
 
   private byte[] loadNext() {
@@ -280,4 +301,38 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
     return a.length - b.length;
   }
 
+  static class ResourceCleaner implements Runnable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResourceCleaner.class);
+
+    private final DBIterator dbIterator;
+
+    private final LevelDB levelDB;
+
+    private final AtomicBoolean started = new AtomicBoolean(true);
+
+    ResourceCleaner(DBIterator dbIterator, LevelDB levelDB) {
+      this.dbIterator = dbIterator;
+      this.levelDB = levelDB;
+    }
+
+    @Override
+    public void run() {
+      if (started.compareAndSet(true, false)) {
+        try {
+          levelDB.closeIterator(dbIterator);
+        } catch (IOException e) {
+          LOG.warn("Failed to close iterator", e);
+        }
+      }
+    }
+
+    void setStartedToFalse() {
+      started.set(false);
+    }
+
+    @VisibleForTesting
+    boolean isCompleted() {
+      return !started.get();
+    }
+  }
 }
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
index d328e5c79d3..dc7ad0be5c0 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
@@ -359,7 +359,7 @@ public class RocksDB implements KVStore {
    * Closes the given iterator if the DB is still open. Trying to close a JNI 
RocksDB handle
    * with a closed DB can cause JVM crashes, so this ensures that situation 
does not happen.
    */
-  void closeIterator(RocksDBIterator<?> it) throws IOException {
+  void closeIterator(RocksIterator it) {
     notifyIteratorClosed(it);
     synchronized (this._db) {
       org.rocksdb.RocksDB _db = this._db.get();
@@ -373,8 +373,11 @@ public class RocksDB implements KVStore {
    * Remove iterator from iterator tracker. `RocksDBIterator` calls it to 
notify
    * iterator is closed.
    */
-  void notifyIteratorClosed(RocksDBIterator<?> it) {
-    iteratorTracker.removeIf(ref -> it.equals(ref.get()));
+  void notifyIteratorClosed(RocksIterator rocksIterator) {
+    iteratorTracker.removeIf(ref -> {
+      RocksDBIterator<?> rocksDBIterator = ref.get();
+      return rocksDBIterator != null && 
rocksIterator.equals(rocksDBIterator.internalIterator());
+    });
   }
 
   /** Returns metadata about indices for the given type. */
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
index ba7b8d8b813..a98b0482e35 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
@@ -18,7 +18,9 @@
 package org.apache.spark.util.kvstore;
 
 import java.io.IOException;
+import java.lang.ref.Cleaner;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -27,6 +29,7 @@ import org.rocksdb.RocksIterator;
 
 class RocksDBIterator<T> implements KVStoreIterator<T> {
 
+  private static final Cleaner CLEANER = Cleaner.create();
   private final RocksDB db;
   private final boolean ascending;
   private final RocksIterator it;
@@ -36,6 +39,8 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
   private final byte[] indexKeyPrefix;
   private final byte[] end;
   private final long max;
+  private final Cleaner.Cleanable cleanable;
+  private final RocksDBIterator.ResourceCleaner resourceCleaner;
 
   private boolean checkedNext;
   private byte[] next;
@@ -50,6 +55,8 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
     this.ti = db.getTypeInfo(type);
     this.index = ti.index(params.index);
     this.max = params.max;
+    this.resourceCleaner = new RocksDBIterator.ResourceCleaner(it, db);
+    this.cleanable = CLEANER.register(this, resourceCleaner);
 
     Preconditions.checkArgument(!index.isChild() || params.parent != null,
       "Cannot iterate over child index %s without parent value.", 
params.index);
@@ -176,22 +183,33 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
 
   @Override
   public synchronized void close() throws IOException {
-    db.notifyIteratorClosed(this);
+    db.notifyIteratorClosed(it);
     if (!closed) {
-      it.close();
-      closed = true;
-      next = null;
+      try {
+        it.close();
+      } finally {
+        closed = true;
+        next = null;
+        cancelResourceClean();
+      }
     }
   }
 
   /**
-   * Because it's tricky to expose closeable iterators through many internal 
APIs, especially
-   * when Scala wrappers are used, this makes sure that, hopefully, the JNI 
resources held by
-   * the iterator will eventually be released.
+   * Prevent ResourceCleaner from actually releasing resources after close it.
    */
-  @Override
-  protected void finalize() throws Throwable {
-    db.closeIterator(this);
+  private void cancelResourceClean() {
+    this.resourceCleaner.setStartedToFalse();
+    this.cleanable.clean();
+  }
+
+  @VisibleForTesting
+  ResourceCleaner getResourceCleaner() {
+    return resourceCleaner;
+  }
+
+  RocksIterator internalIterator() {
+    return it;
   }
 
   private byte[] loadNext() {
@@ -272,4 +290,31 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
     return a.length - b.length;
   }
 
+  static class ResourceCleaner implements Runnable {
+
+    private final RocksIterator rocksIterator;
+    private final RocksDB rocksDB;
+    private final AtomicBoolean started = new AtomicBoolean(true);
+
+    ResourceCleaner(RocksIterator rocksIterator, RocksDB rocksDB) {
+      this.rocksIterator = rocksIterator;
+      this.rocksDB = rocksDB;
+    }
+
+    @Override
+    public void run() {
+      if (started.compareAndSet(true, false)) {
+        rocksDB.closeIterator(rocksIterator);
+      }
+    }
+
+    void setStartedToFalse() {
+      started.set(false);
+    }
+
+    @VisibleForTesting
+    boolean isCompleted() {
+      return !started.get();
+    }
+  }
 }
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
index ec1c8101737..c22aea821af 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
@@ -18,6 +18,8 @@
 package org.apache.spark.util.kvstore;
 
 import java.io.File;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -33,6 +35,7 @@ import org.iq80.leveldb.DBIterator;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+
 import static org.junit.jupiter.api.Assertions.*;
 import static org.junit.jupiter.api.Assumptions.assumeFalse;
 
@@ -383,6 +386,42 @@ public class LevelDBSuite {
     assertFalse(iter.skip(1));
   }
 
+  @Test
+  public void testResourceCleaner() throws Exception {
+    File dbPathForCleanerTest = File.createTempFile(
+      "test_db_cleaner.", ".rdb");
+    dbPathForCleanerTest.delete();
+
+    LevelDB dbForCleanerTest = new LevelDB(dbPathForCleanerTest);
+    try {
+      for (int i = 0; i < 8192; i++) {
+        dbForCleanerTest.write(createCustomType1(i));
+      }
+      LevelDBIterator<CustomType1> levelDBIterator =
+        (LevelDBIterator<CustomType1>) 
dbForCleanerTest.view(CustomType1.class).iterator();
+      Reference<LevelDBIterator<?>> reference = new 
WeakReference<>(levelDBIterator);
+      assertNotNull(reference);
+      LevelDBIterator.ResourceCleaner resourceCleaner = 
levelDBIterator.getResourceCleaner();
+      assertFalse(resourceCleaner.isCompleted());
+      // Manually set levelDBIterator to null, to be GC.
+      levelDBIterator = null;
+      // 100 times gc, the levelDBIterator should be GCed.
+      int count = 0;
+      while (count < 100 && !reference.refersTo(null)) {
+        System.gc();
+        count++;
+        Thread.sleep(100);
+      }
+      // check rocksDBIterator should be GCed
+      assertTrue(reference.refersTo(null));
+      // Verify that the Cleaner will be executed after a period of time, 
isAllocated is true.
+      assertTrue(resourceCleaner.isCompleted());
+    } finally {
+      dbForCleanerTest.close();
+      FileUtils.deleteQuietly(dbPathForCleanerTest);
+    }
+  }
+
   private CustomType1 createCustomType1(int i) {
     CustomType1 t = new CustomType1();
     t.key = "key" + i;
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
index b61a7afcd07..61f18a9a26d 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
@@ -18,6 +18,8 @@
 package org.apache.spark.util.kvstore;
 
 import java.io.File;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -381,6 +383,43 @@ public class RocksDBSuite {
     assertFalse(iter.skip(1));
   }
 
+  @Test
+  public void testResourceCleaner() throws Exception {
+    File dbPathForCleanerTest = File.createTempFile(
+      "test_db_cleaner.", ".rdb");
+    dbPathForCleanerTest.delete();
+
+    RocksDB dbForCleanerTest = new RocksDB(dbPathForCleanerTest);
+    try {
+      for (int i = 0; i < 8192; i++) {
+        dbForCleanerTest.write(createCustomType1(i));
+      }
+      RocksDBIterator<CustomType1> rocksDBIterator =
+        (RocksDBIterator<CustomType1>) 
dbForCleanerTest.view(CustomType1.class).iterator();
+      Reference<RocksDBIterator<?>> reference = new 
WeakReference<>(rocksDBIterator);
+      assertNotNull(reference);
+      RocksDBIterator.ResourceCleaner resourceCleaner = 
rocksDBIterator.getResourceCleaner();
+      assertFalse(resourceCleaner.isCompleted());
+      // Manually set rocksDBIterator to null, to be GC.
+      rocksDBIterator = null;
+      // 100 times gc, the rocksDBIterator should be GCed.
+      int count = 0;
+      while (count < 100 && !reference.refersTo(null)) {
+        System.gc();
+        count++;
+        Thread.sleep(100);
+      }
+      // check rocksDBIterator should be GCed
+      assertTrue(reference.refersTo(null));
+      // Verify that the Cleaner will be executed after a period of time,
+      // and status will become false.
+      assertTrue(resourceCleaner.isCompleted());
+    } finally {
+      dbForCleanerTest.close();
+      FileUtils.deleteQuietly(dbPathForCleanerTest);
+    }
+  }
+
   private CustomType1 createCustomType1(int i) {
     CustomType1 t = new CustomType1();
     t.key = "key" + i;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to