This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d40b1aea758 [SPARK-46205][CORE] Improve `PersistenceEngine` 
performance with `KryoSerializer`
0d40b1aea758 is described below

commit 0d40b1aea758b95a4416c8653599af8713a4aa16
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Fri Dec 1 18:29:42 2023 -0800

    [SPARK-46205][CORE] Improve `PersistenceEngine` performance with 
`KryoSerializer`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve `PersistenceEngine` performance with 
`KryoSerializer` via introducing a new configuration, 
`spark.deploy.recoverySerializer`.
    
    ### Why are the changes needed?
    
    Allow users to choose a better serializer to get a better performance in 
their environment. Especially, `KryoSerializer` is about **3x faster** than 
`JavaSerializer` with `FileSystemPersistenceEngine`.
    
    ```
    
================================================================================================
    PersistenceEngineBenchmark
    
================================================================================================
    
    OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
    AMD EPYC 7763 64-Core Processor
    1000 Workers:                                    Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
-------------------------------------------------------------------------------------------------------------------------------
    ZooKeeperPersistenceEngine with JavaSerializer            1202           
1298         138          0.0     1201614.2       1.0X
    ZooKeeperPersistenceEngine with KryoSerializer             951           
1004          48          0.0      950559.0       1.3X
    FileSystemPersistenceEngine with JavaSerializer            212            
217           6          0.0      211623.2       5.7X
    FileSystemPersistenceEngine with KryoSerializer             79             
81           2          0.0       79132.5      15.2X
    BlackHolePersistenceEngine                                   0              
0           0         30.9          32.4   37109.8X
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The default behavior is the same.
    
    ### How was this patch tested?
    
    Pass the CIs with the new added test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44113 from dongjoon-hyun/SPARK-46205.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../PersistenceEngineBenchmark-jdk21-results.txt   | 12 ++++++-----
 .../PersistenceEngineBenchmark-results.txt         | 12 ++++++-----
 .../master/FileSystemPersistenceEngine.scala       |  2 +-
 .../org/apache/spark/deploy/master/Master.scala    |  7 +++++--
 .../org/apache/spark/internal/config/Deploy.scala  | 16 +++++++++++++++
 .../apache/spark/deploy/master/MasterSuite.scala   | 22 ++++++++++++++++++++
 .../deploy/master/PersistenceEngineBenchmark.scala | 24 +++++++++++++++++++---
 .../deploy/master/PersistenceEngineSuite.scala     | 14 ++++++++++++-
 8 files changed, 92 insertions(+), 17 deletions(-)

diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt 
b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
index 3312d6feff88..65dbfd0990d3 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
@@ -4,10 +4,12 @@ PersistenceEngineBenchmark
 
 OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
-1000 Workers:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------
-ZooKeeperPersistenceEngine                         1183           1266         
129          0.0     1183158.2       1.0X
-FileSystemPersistenceEngine                         218            222         
  4          0.0      218005.2       5.4X
-BlackHolePersistenceEngine                            0              0         
  0         29.5          34.0   34846.9X
+1000 Workers:                                    Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-------------------------------------------------------------------------------------------------------------------------------
+ZooKeeperPersistenceEngine with JavaSerializer            1100           1255  
       150          0.0     1099532.9       1.0X
+ZooKeeperPersistenceEngine with KryoSerializer             946            967  
        20          0.0      946367.3       1.2X
+FileSystemPersistenceEngine with JavaSerializer            218            223  
         4          0.0      217851.5       5.0X
+FileSystemPersistenceEngine with KryoSerializer             79             87  
        12          0.0       78611.1      14.0X
+BlackHolePersistenceEngine                                   0              0  
         0         42.0          23.8   46191.1X
 
 
diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt 
b/core/benchmarks/PersistenceEngineBenchmark-results.txt
index 684963f92e1f..eb63d07b4dff 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt
@@ -4,10 +4,12 @@ PersistenceEngineBenchmark
 
 OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
-1000 Workers:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------
-ZooKeeperPersistenceEngine                         1086           1215         
162          0.0     1085606.9       1.0X
-FileSystemPersistenceEngine                         224            225         
  1          0.0      223834.2       4.9X
-BlackHolePersistenceEngine                            0              0         
  0         40.7          24.6   44209.4X
+1000 Workers:                                    Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-------------------------------------------------------------------------------------------------------------------------------
+ZooKeeperPersistenceEngine with JavaSerializer            1202           1298  
       138          0.0     1201614.2       1.0X
+ZooKeeperPersistenceEngine with KryoSerializer             951           1004  
        48          0.0      950559.0       1.3X
+FileSystemPersistenceEngine with JavaSerializer            212            217  
         6          0.0      211623.2       5.7X
+FileSystemPersistenceEngine with KryoSerializer             79             81  
         2          0.0       79132.5      15.2X
+BlackHolePersistenceEngine                                   0              0  
         0         30.9          32.4   37109.8X
 
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index b184a71a90cd..7f624816f253 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -67,10 +67,10 @@ private[master] class FileSystemPersistenceEngine(
       out = serializer.newInstance().serializeStream(fileOut)
       out.writeObject(value)
     } {
-      fileOut.close()
       if (out != null) {
         out.close()
       }
+      fileOut.close()
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 976655f029a9..0fe72e28ea5b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -39,7 +39,7 @@ import org.apache.spark.internal.config.Worker._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, 
ResourceUtils}
 import org.apache.spark.rpc._
-import org.apache.spark.serializer.{JavaSerializer, Serializer}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, 
Utils}
 import org.apache.spark.util.ArrayImplicits._
 
@@ -172,7 +172,10 @@ private[deploy] class Master(
     masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
     applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
 
-    val serializer = new JavaSerializer(conf)
+    val serializer = 
RecoverySerializer.withName(conf.get(RECOVERY_SERIALIZER)) match {
+      case RecoverySerializer.JAVA => new JavaSerializer(conf)
+      case RecoverySerializer.KRYO => new KryoSerializer(conf)
+    }
     val (persistenceEngine_, leaderElectionAgent_) = recoveryMode match {
       case "ZOOKEEPER" =>
         logInfo("Persisting recovery state to ZooKeeper")
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
index 7b35e92022ae..ae9e8b79dfe5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
@@ -17,12 +17,28 @@
 
 package org.apache.spark.internal.config
 
+import java.util.Locale
+
 private[spark] object Deploy {
   val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode")
     .version("0.8.1")
     .stringConf
     .createWithDefault("NONE")
 
+  object RecoverySerializer extends Enumeration {
+    val JAVA, KRYO = Value
+  }
+
+  val RECOVERY_SERIALIZER = ConfigBuilder("spark.deploy.recoverySerializer")
+    .doc("Serializer for writing/reading objects to/from persistence engines; 
" +
+      "JAVA or KRYO. Java serializer has been the default mode since Spark 
0.8.1." +
+      "KRYO serializer is a new fast and compact mode from Spark 4.0.0.")
+    .version("4.0.0")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValues(RecoverySerializer.values.map(_.toString))
+    .createWithDefault(RecoverySerializer.JAVA.toString)
+
   val RECOVERY_MODE_FACTORY = 
ConfigBuilder("spark.deploy.recoveryMode.factory")
     .version("1.2.0")
     .stringConf
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 2e54673649c7..272bcad3b191 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -50,6 +50,7 @@ import 
org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
 import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.serializer
+import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.Utils
 
 object MockWorker {
@@ -325,6 +326,26 @@ class MasterSuite extends SparkFunSuite
     }
   }
 
+  test("SPARK-46205: Recovery with Kryo Serializer") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "FILESYSTEM")
+    conf.set(RECOVERY_SERIALIZER, "Kryo")
+    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+    var master: Master = null
+    try {
+      master = makeAliveMaster(conf)
+      val e = 
master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
+      assert(e.serializer.isInstanceOf[KryoSerializer])
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+      }
+    }
+  }
+
   test("master/worker web ui available") {
     implicit val formats = org.json4s.DefaultFormats
     val conf = new SparkConf()
@@ -805,6 +826,7 @@ class MasterSuite extends SparkFunSuite
   private val _newDriverId = PrivateMethod[String](Symbol("newDriverId"))
   private val _newApplicationId = 
PrivateMethod[String](Symbol("newApplicationId"))
   private val _createApplication = 
PrivateMethod[ApplicationInfo](Symbol("createApplication"))
+  private val _persistenceEngine = 
PrivateMethod[PersistenceEngine](Symbol("persistenceEngine"))
 
   private val workerInfo = makeWorkerInfo(4096, 10)
   private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
index 9917be9b1c09..730ae05fa146 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
@@ -26,7 +26,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
 import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
-import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.util.Utils
 
 
@@ -47,6 +47,7 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
 
   val conf = new SparkConf()
   val serializerJava = new JavaSerializer(conf)
+  val serializerKryo = new KryoSerializer(conf)
   val zkTestServer = new TestingServer(findFreePort(conf))
 
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
@@ -60,7 +61,7 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
     runBenchmark("PersistenceEngineBenchmark") {
       val benchmark = new Benchmark(s"$numWorkers Workers", numWorkers, output 
= output)
 
-      benchmark.addCase("ZooKeeperPersistenceEngine", numIters) { _ =>
+      benchmark.addCase("ZooKeeperPersistenceEngine with JavaSerializer", 
numIters) { _ =>
         val engine = new ZooKeeperPersistenceEngine(conf, serializerJava)
         workers.foreach(engine.addWorker)
         engine.read[WorkerInfo]("worker_")
@@ -68,7 +69,15 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
         engine.close()
       }
 
-      benchmark.addCase("FileSystemPersistenceEngine", numIters) { _ =>
+      benchmark.addCase("ZooKeeperPersistenceEngine with KryoSerializer", 
numIters) { _ =>
+        val engine = new ZooKeeperPersistenceEngine(conf, serializerKryo)
+        workers.foreach(engine.addWorker)
+        engine.read[WorkerInfo]("worker_")
+        workers.foreach(engine.removeWorker)
+        engine.close()
+      }
+
+      benchmark.addCase("FileSystemPersistenceEngine with JavaSerializer", 
numIters) { _ =>
         val dir = Utils.createTempDir().getAbsolutePath
         val engine = new FileSystemPersistenceEngine(dir, serializerJava)
         workers.foreach(engine.addWorker)
@@ -77,6 +86,15 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
         engine.close()
       }
 
+      benchmark.addCase("FileSystemPersistenceEngine with KryoSerializer", 
numIters) { _ =>
+        val dir = Utils.createTempDir().getAbsolutePath
+        val engine = new FileSystemPersistenceEngine(dir, serializerKryo)
+        workers.foreach(engine.addWorker)
+        engine.read[WorkerInfo]("worker_")
+        workers.foreach(engine.removeWorker)
+        engine.close()
+      }
+
       benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ =>
         val engine = new BlackHolePersistenceEngine()
         workers.foreach(engine.addWorker)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 68efdddc0873..66a61d80d2a9 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -26,7 +26,7 @@ import org.apache.curator.test.TestingServer
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
 import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
-import org.apache.spark.serializer.{JavaSerializer, Serializer}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
 import org.apache.spark.util.Utils
 
 class PersistenceEngineSuite extends SparkFunSuite {
@@ -53,6 +53,18 @@ class PersistenceEngineSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-46205: Support KryoSerializer in FileSystemPersistenceEngine") {
+    withTempDir { dir =>
+      val conf = new SparkConf()
+      val serializer = new KryoSerializer(conf)
+      val engine = new FileSystemPersistenceEngine(dir.getAbsolutePath, 
serializer)
+      engine.persist("test_1", "test_1_value")
+      engine.read[String]("test_1")
+      engine.unpersist("test_1")
+      engine.close()
+    }
+  }
+
   test("ZooKeeperPersistenceEngine") {
     val conf = new SparkConf()
     // TestingServer logs the port conflict exception rather than throwing an 
exception.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to