This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7258f69 [SPARK-35396] Add AutoCloseable close to BlockManager and InMemoryRelation 7258f69 is described below commit 7258f691887aedcf7ba3eb4e478d67a5637643b9 Author: Chendi Xue <chendi....@intel.com> AuthorDate: Tue May 25 08:55:25 2021 -0500 [SPARK-35396] Add AutoCloseable close to BlockManager and InMemoryRelation This PR is proposing a add-on to support to manual close entries in MemoryStore and InMemoryRelation ### What changes were proposed in this pull request? Currently: MemoryStore uses a LinkedHashMap[BlockId, MemoryEntry[_]] to store all OnHeap or OffHeap entries. And when memoryStore.remove(blockId) is called, codes will simply remove one entry from LinkedHashMap and leverage Java GC to do release work. This PR: We are proposing a add-on to manually close any object stored in MemoryStore and InMemoryRelation if this object is extended from AutoCloseable. Veifiication: In our own use case, we implemented a user-defined off-heap-hashRelation for BHJ, and we verified that by adding this manual close, we can make sure our defined off-heap-hashRelation can be released when evict is called. Also, we implemented user-defined cachedBatch and will be release when InMemoryRelation.clearCache() is called by this PR ### Why are the changes needed? This changes can help to clean some off-heap user-defined object may be cached in InMemoryRelation or MemoryStore ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? WIP Signed-off-by: Chendi Xue <chendi.xueintel.com> Closes #32534 from xuechendi/support_manual_close_in_memorystore. Authored-by: Chendi Xue <chendi....@intel.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../apache/spark/storage/memory/MemoryStore.scala | 32 ++++++- .../apache/spark/storage/MemoryStoreSuite.scala | 100 +++++++++++++++++++++ 2 files changed, 131 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 375d05b..2079b26 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -21,9 +21,14 @@ import java.io.OutputStream import java.nio.ByteBuffer import java.util.LinkedHashMap +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future import scala.reflect.ClassTag +import scala.util.{Failure, Success} +import scala.util.control.NonFatal import com.google.common.io.ByteStreams @@ -385,9 +390,33 @@ private[spark] class MemoryStore( } } + def manualClose[T <: MemoryEntry[_]](entry: T): T = { + val entryManualCloseTasks = Future { + entry match { + case e: DeserializedMemoryEntry[_] => e.value.foreach { + case o: AutoCloseable => + try { + o.close + } catch { + case NonFatal(e) => + logWarning(s"Got NonFatal exception during remove") + } + case _ => + } + case _ => + } + } + entryManualCloseTasks.onComplete { + case Success(_) => + case Failure(e) => throw e + } + entry + } + def remove(blockId: BlockId): Boolean = memoryManager.synchronized { val entry = entries.synchronized { - entries.remove(blockId) + val removed = entries.remove(blockId) + manualClose(removed) } if (entry != null) { entry match { @@ -405,6 +434,7 @@ private[spark] class MemoryStore( def clear(): Unit = memoryManager.synchronized { entries.synchronized { + entries.values.asScala.foreach(manualClose) entries.clear() } onHeapUnrollMemoryMap.clear() diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index d6a4e5b..fea4882 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -546,4 +546,104 @@ class MemoryStoreSuite } } } + + test("put user-defined objects to MemoryStore and remove") { + val (memoryStore, _) = makeMemoryStore(12000) + val blockId = BlockId("rdd_3_10") + case class DummyAllocator() { + private var allocated: Int = 0 + def alloc(size: Int): Unit = synchronized { + allocated += size + } + def release(size: Int): Unit = synchronized { + allocated -= size + } + def getAllocatedMemory: Int = synchronized { + allocated + } + } + case class NativeObject(alloc: DummyAllocator, size: Int) + extends KnownSizeEstimation + with AutoCloseable { + alloc.alloc(size) + var allocated_size: Int = size + override def estimatedSize: Long = allocated_size + override def close(): Unit = synchronized { + alloc.release(allocated_size) + allocated_size = 0 + } + } + val allocator = DummyAllocator() + val nativeObjList = List.fill(40)(NativeObject(allocator, 100)) + def nativeObjIterator: Iterator[Any] = nativeObjList.iterator.asInstanceOf[Iterator[Any]] + def putIteratorAsValues[T]( + blockId: BlockId, + iter: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + memoryStore.putIteratorAsValues(blockId, iter, classTag) + } + + // Unroll with plenty of space. This should succeed and cache both blocks. + assert(putIteratorAsValues("b1", nativeObjIterator, ClassTag.Any).isRight) + assert(putIteratorAsValues("b2", nativeObjIterator, ClassTag.Any).isRight) + + memoryStore.remove("b1") + memoryStore.remove("b2") + + // Check if allocator was cleared. + while (allocator.getAllocatedMemory > 0) { + Thread.sleep(500) + } + assert(allocator.getAllocatedMemory == 0) + } + + test("put user-defined objects to MemoryStore and clear") { + val (memoryStore, _) = makeMemoryStore(12000) + val blockId = BlockId("rdd_3_10") + case class DummyAllocator() { + private var allocated: Int = 0 + def alloc(size: Int): Unit = synchronized { + allocated += size + } + def release(size: Int): Unit = synchronized { + allocated -= size + } + def getAllocatedMemory: Int = synchronized { + allocated + } + } + case class NativeObject(alloc: DummyAllocator, size: Int) + extends KnownSizeEstimation + with AutoCloseable { + + alloc.alloc(size) + var allocated_size: Int = size + override def estimatedSize: Long = allocated_size + override def close(): Unit = synchronized { + Thread.sleep(10) + alloc.release(allocated_size) + allocated_size = 0 + } + } + val allocator = DummyAllocator() + val nativeObjList = List.fill(40)(NativeObject(allocator, 100)) + def nativeObjIterator: Iterator[Any] = nativeObjList.iterator.asInstanceOf[Iterator[Any]] + def putIteratorAsValues[T]( + blockId: BlockId, + iter: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + memoryStore.putIteratorAsValues(blockId, iter, classTag) + } + + // Unroll with plenty of space. This should succeed and cache both blocks. + assert(putIteratorAsValues("b1", nativeObjIterator, ClassTag.Any).isRight) + assert(putIteratorAsValues("b2", nativeObjIterator, ClassTag.Any).isRight) + + memoryStore.clear + // Check if allocator was cleared. + while (allocator.getAllocatedMemory > 0) { + Thread.sleep(500) + } + assert(allocator.getAllocatedMemory == 0) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org