This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3da2e5c6324 [SPARK-46216][CORE] Improve `FileSystemPersistenceEngine` 
to support compressions
3da2e5c6324 is described below

commit 3da2e5c632468ec7cf7001255c1a44197b46ce30
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Sun Dec 3 00:26:16 2023 -0800

    [SPARK-46216][CORE] Improve `FileSystemPersistenceEngine` to support 
compressions
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve `FileSystemPersistenceEngine` to support 
compressions via a new configuration, `spark.deploy.recoveryCompressionCodec`.
    
    ### Why are the changes needed?
    
    To allow the users to choose a proper compression codec for their 
workloads. For `JavaSerializer` case, `LZ4` compression is **2x** faster than 
the baseline (no compression).
    ```
    OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
    AMD EPYC 7763 64-Core Processor
    2000 Workers:                                             Best Time(ms)   
Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
----------------------------------------------------------------------------------------------------------------------------------------
    ZooKeeperPersistenceEngine with JavaSerializer                     2276     
      2360         115          0.0     1137909.6       1.0X
    ZooKeeperPersistenceEngine with KryoSerializer                     1883     
      1906          34          0.0      941364.2       1.2X
    FileSystemPersistenceEngine with JavaSerializer                     431     
       436           7          0.0      215436.9       5.3X
    FileSystemPersistenceEngine with JavaSerializer (lz4)               209     
       216           9          0.0      104404.1      10.9X
    FileSystemPersistenceEngine with JavaSerializer (lzf)               199     
       202           2          0.0       99489.5      11.4X
    FileSystemPersistenceEngine with JavaSerializer (snappy)            192     
       199           9          0.0       95872.9      11.9X
    FileSystemPersistenceEngine with JavaSerializer (zstd)              258     
       264           6          0.0      129249.4       8.8X
    FileSystemPersistenceEngine with KryoSerializer                     139     
       151          13          0.0       69374.5      16.4X
    FileSystemPersistenceEngine with KryoSerializer (lz4)               159     
       165           8          0.0       79588.9      14.3X
    FileSystemPersistenceEngine with KryoSerializer (lzf)               180     
       195          18          0.0       89844.0      12.7X
    FileSystemPersistenceEngine with KryoSerializer (snappy)            164     
       183          18          0.0       82016.0      13.9X
    FileSystemPersistenceEngine with KryoSerializer (zstd)              206     
       218          11          0.0      102838.9      11.1X
    BlackHolePersistenceEngine                                            0     
         0           0         35.1          28.5   39908.5X
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a new feature.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44129 from dongjoon-hyun/SPARK-46216.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../PersistenceEngineBenchmark-jdk21-results.txt   | 22 ++++++++----
 .../PersistenceEngineBenchmark-results.txt         | 22 ++++++++----
 .../master/FileSystemPersistenceEngine.scala       | 10 ++++--
 .../spark/deploy/master/RecoveryModeFactory.scala  |  6 ++--
 .../org/apache/spark/internal/config/Deploy.scala  |  7 ++++
 .../apache/spark/deploy/master/MasterSuite.scala   | 40 ++++++++++++++++++++++
 .../deploy/master/PersistenceEngineBenchmark.scala | 19 ++++++++--
 .../deploy/master/PersistenceEngineSuite.scala     | 13 +++++++
 8 files changed, 118 insertions(+), 21 deletions(-)

diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt 
b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
index 65dbfd0990d..38e74ed6b53 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
@@ -4,12 +4,20 @@ PersistenceEngineBenchmark
 
 OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
-1000 Workers:                                    Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
-ZooKeeperPersistenceEngine with JavaSerializer            1100           1255  
       150          0.0     1099532.9       1.0X
-ZooKeeperPersistenceEngine with KryoSerializer             946            967  
        20          0.0      946367.3       1.2X
-FileSystemPersistenceEngine with JavaSerializer            218            223  
         4          0.0      217851.5       5.0X
-FileSystemPersistenceEngine with KryoSerializer             79             87  
        12          0.0       78611.1      14.0X
-BlackHolePersistenceEngine                                   0              0  
         0         42.0          23.8   46191.1X
+2000 Workers:                                             Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+----------------------------------------------------------------------------------------------------------------------------------------
+ZooKeeperPersistenceEngine with JavaSerializer                     2254        
   2329         119          0.0     1126867.1       1.0X
+ZooKeeperPersistenceEngine with KryoSerializer                     1911        
   1912           1          0.0      955667.1       1.2X
+FileSystemPersistenceEngine with JavaSerializer                     438        
    448          15          0.0      218868.1       5.1X
+FileSystemPersistenceEngine with JavaSerializer (lz4)               187        
    195           8          0.0       93337.8      12.1X
+FileSystemPersistenceEngine with JavaSerializer (lzf)               193        
    216          20          0.0       96678.8      11.7X
+FileSystemPersistenceEngine with JavaSerializer (snappy)            175        
    183          10          0.0       87652.3      12.9X
+FileSystemPersistenceEngine with JavaSerializer (zstd)              243        
    255          14          0.0      121695.2       9.3X
+FileSystemPersistenceEngine with KryoSerializer                     150        
    160          15          0.0       75089.7      15.0X
+FileSystemPersistenceEngine with KryoSerializer (lz4)               170        
    177          10          0.0       84996.7      13.3X
+FileSystemPersistenceEngine with KryoSerializer (lzf)               192        
    203          12          0.0       96019.1      11.7X
+FileSystemPersistenceEngine with KryoSerializer (snappy)            184        
    202          16          0.0       92241.3      12.2X
+FileSystemPersistenceEngine with KryoSerializer (zstd)              232        
    238           5          0.0      116075.2       9.7X
+BlackHolePersistenceEngine                                            0        
      0           0         27.3          36.6   30761.0X
 
 
diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt 
b/core/benchmarks/PersistenceEngineBenchmark-results.txt
index eb63d07b4df..fc743da46a8 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt
@@ -4,12 +4,20 @@ PersistenceEngineBenchmark
 
 OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
-1000 Workers:                                    Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
-ZooKeeperPersistenceEngine with JavaSerializer            1202           1298  
       138          0.0     1201614.2       1.0X
-ZooKeeperPersistenceEngine with KryoSerializer             951           1004  
        48          0.0      950559.0       1.3X
-FileSystemPersistenceEngine with JavaSerializer            212            217  
         6          0.0      211623.2       5.7X
-FileSystemPersistenceEngine with KryoSerializer             79             81  
         2          0.0       79132.5      15.2X
-BlackHolePersistenceEngine                                   0              0  
         0         30.9          32.4   37109.8X
+2000 Workers:                                             Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+----------------------------------------------------------------------------------------------------------------------------------------
+ZooKeeperPersistenceEngine with JavaSerializer                     2276        
   2360         115          0.0     1137909.6       1.0X
+ZooKeeperPersistenceEngine with KryoSerializer                     1883        
   1906          34          0.0      941364.2       1.2X
+FileSystemPersistenceEngine with JavaSerializer                     431        
    436           7          0.0      215436.9       5.3X
+FileSystemPersistenceEngine with JavaSerializer (lz4)               209        
    216           9          0.0      104404.1      10.9X
+FileSystemPersistenceEngine with JavaSerializer (lzf)               199        
    202           2          0.0       99489.5      11.4X
+FileSystemPersistenceEngine with JavaSerializer (snappy)            192        
    199           9          0.0       95872.9      11.9X
+FileSystemPersistenceEngine with JavaSerializer (zstd)              258        
    264           6          0.0      129249.4       8.8X
+FileSystemPersistenceEngine with KryoSerializer                     139        
    151          13          0.0       69374.5      16.4X
+FileSystemPersistenceEngine with KryoSerializer (lz4)               159        
    165           8          0.0       79588.9      14.3X
+FileSystemPersistenceEngine with KryoSerializer (lzf)               180        
    195          18          0.0       89844.0      12.7X
+FileSystemPersistenceEngine with KryoSerializer (snappy)            164        
    183          18          0.0       82016.0      13.9X
+FileSystemPersistenceEngine with KryoSerializer (zstd)              206        
    218          11          0.0      102838.9      11.1X
+BlackHolePersistenceEngine                                            0        
      0           0         35.1          28.5   39908.5X
 
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 094136ea274..785367a0dee 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -23,6 +23,7 @@ import java.nio.file.{Files, Paths}
 import scala.reflect.ClassTag
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, Serializer}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
@@ -37,7 +38,8 @@ import org.apache.spark.util.Utils
  */
 private[master] class FileSystemPersistenceEngine(
     val dir: String,
-    val serializer: Serializer)
+    val serializer: Serializer,
+    val codec: Option[CompressionCodec] = None)
   extends PersistenceEngine with Logging {
 
   Files.createDirectories(Paths.get(dir))
@@ -62,7 +64,8 @@ private[master] class FileSystemPersistenceEngine(
     if (file.exists()) { throw new IllegalStateException("File already exists: 
" + file) }
     val created = file.createNewFile()
     if (!created) { throw new IllegalStateException("Could not create file: " 
+ file) }
-    val fileOut = new FileOutputStream(file)
+    var fileOut: OutputStream = new FileOutputStream(file)
+    codec.foreach { c => fileOut = c.compressedOutputStream(fileOut) }
     var out: SerializationStream = null
     Utils.tryWithSafeFinally {
       out = serializer.newInstance().serializeStream(fileOut)
@@ -76,7 +79,8 @@ private[master] class FileSystemPersistenceEngine(
   }
 
   private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = 
{
-    val fileIn = new FileInputStream(file)
+    var fileIn: InputStream = new FileInputStream(file)
+    codec.foreach { c => fileIn = c.compressedInputStream(new 
FileInputStream(file)) }
     var in: DeserializationStream = null
     try {
       in = serializer.newInstance().deserializeStream(fileIn)
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
index 470798793ce..c24c4e5fe6b 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -20,7 +20,8 @@ package org.apache.spark.deploy.master
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.Deploy.RECOVERY_DIRECTORY
+import org.apache.spark.internal.config.Deploy.{RECOVERY_COMPRESSION_CODEC, 
RECOVERY_DIRECTORY}
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.Serializer
 
 /**
@@ -57,7 +58,8 @@ private[master] class FileSystemRecoveryModeFactory(conf: 
SparkConf, serializer:
 
   def createPersistenceEngine(): PersistenceEngine = {
     logInfo("Persisting recovery state to directory: " + recoveryDir)
-    new FileSystemPersistenceEngine(recoveryDir, serializer)
+    val codec = conf.get(RECOVERY_COMPRESSION_CODEC).map(c => 
CompressionCodec.createCodec(conf, c))
+    new FileSystemPersistenceEngine(recoveryDir, serializer, codec)
   }
 
   def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent 
= {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
index ae9e8b79dfe..b52ea356789 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
@@ -39,6 +39,13 @@ private[spark] object Deploy {
     .checkValues(RecoverySerializer.values.map(_.toString))
     .createWithDefault(RecoverySerializer.JAVA.toString)
 
+  val RECOVERY_COMPRESSION_CODEC = 
ConfigBuilder("spark.deploy.recoveryCompressionCodec")
+    .doc("A compression codec for persistence engines. none (default), lz4, 
lzf, snappy, and " +
+      "zstd. Currently, only FILESYSTEM mode supports this configuration.")
+    .version("4.0.0")
+    .stringConf
+    .createOptional
+
   val RECOVERY_MODE_FACTORY = 
ConfigBuilder("spark.deploy.recoveryMode.factory")
     .version("1.2.0")
     .stringConf
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 272bcad3b19..81756d807b2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -45,6 +45,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
+import org.apache.spark.io.LZ4CompressionCodec
 import org.apache.spark.resource.{ResourceInformation, ResourceProfile, 
ResourceRequirement}
 import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
@@ -346,6 +347,45 @@ class MasterSuite extends SparkFunSuite
     }
   }
 
+  test("SPARK-46216: Recovery without compression") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "FILESYSTEM")
+    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+    var master: Master = null
+    try {
+      master = makeAliveMaster(conf)
+      val e = 
master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
+      assert(e.codec.isEmpty)
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+      }
+    }
+  }
+
+  test("SPARK-46216: Recovery with compression") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "FILESYSTEM")
+    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+    conf.set(RECOVERY_COMPRESSION_CODEC, "lz4")
+
+    var master: Master = null
+    try {
+      master = makeAliveMaster(conf)
+      val e = 
master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
+      assert(e.codec.get.isInstanceOf[LZ4CompressionCodec])
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+      }
+    }
+  }
+
   test("master/worker web ui available") {
     implicit val formats = org.json4s.DefaultFormats
     val conf = new SparkConf()
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
index 2f8e9a8eff2..a1df75b846d 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
@@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
 import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.util.Utils
@@ -52,7 +53,7 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
 
     val numIters = 3
-    val numWorkers = 1000
+    val numWorkers = 2000
     val workers = (1 to numWorkers).map(createWorkerInfo).toArray
 
     conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString)
@@ -73,7 +74,8 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
 
       serializers.foreach { serializer =>
         val serializerName = serializer.getClass.getSimpleName
-        benchmark.addCase(s"FileSystemPersistenceEngine with $serializerName", 
numIters) { _ =>
+        val name = s"FileSystemPersistenceEngine with $serializerName"
+        benchmark.addCase(name, numIters) { _ =>
           val dir = Utils.createTempDir().getAbsolutePath
           val engine = new FileSystemPersistenceEngine(dir, serializer)
           workers.foreach(engine.addWorker)
@@ -81,6 +83,19 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
           workers.foreach(engine.removeWorker)
           engine.close()
         }
+        CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c =>
+          val codec = CompressionCodec.createCodec(conf, c)
+          val shortCodecName = CompressionCodec.getShortName(c)
+          val name = s"FileSystemPersistenceEngine with $serializerName 
($shortCodecName)"
+          benchmark.addCase(name, numIters) { _ =>
+            val dir = Utils.createTempDir().getAbsolutePath
+            val engine = new FileSystemPersistenceEngine(dir, serializer, 
Some(codec))
+            workers.foreach(engine.addWorker)
+            engine.read[WorkerInfo]("worker_")
+            workers.foreach(engine.removeWorker)
+            engine.close()
+          }
+        }
       }
 
       benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ =>
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 8546f4e01f3..c02b84fc1a6 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
 import org.apache.spark.util.Utils
@@ -74,6 +75,18 @@ class PersistenceEngineSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-46216: FileSystemPersistenceEngine with compression") {
+    val conf = new SparkConf()
+    CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c =>
+      val codec = CompressionCodec.createCodec(conf, c)
+      withTempDir { dir =>
+        testPersistenceEngine(conf, serializer =>
+          new FileSystemPersistenceEngine(dir.getAbsolutePath, serializer, 
Some(codec))
+        )
+      }
+    }
+  }
+
   test("ZooKeeperPersistenceEngine") {
     val conf = new SparkConf()
     // TestingServer logs the port conflict exception rather than throwing an 
exception.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to