This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0d40b1aea758 [SPARK-46205][CORE] Improve `PersistenceEngine` performance with `KryoSerializer` 0d40b1aea758 is described below commit 0d40b1aea758b95a4416c8653599af8713a4aa16 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Fri Dec 1 18:29:42 2023 -0800 [SPARK-46205][CORE] Improve `PersistenceEngine` performance with `KryoSerializer` ### What changes were proposed in this pull request? This PR aims to improve `PersistenceEngine` performance with `KryoSerializer` via introducing a new configuration, `spark.deploy.recoverySerializer`. ### Why are the changes needed? Allow users to choose a better serializer to get a better performance in their environment. Especially, `KryoSerializer` is about **3x faster** than `JavaSerializer` with `FileSystemPersistenceEngine`. ``` ================================================================================================ PersistenceEngineBenchmark ================================================================================================ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor 1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------- ZooKeeperPersistenceEngine with JavaSerializer 1202 1298 138 0.0 1201614.2 1.0X ZooKeeperPersistenceEngine with KryoSerializer 951 1004 48 0.0 950559.0 1.3X FileSystemPersistenceEngine with JavaSerializer 212 217 6 0.0 211623.2 5.7X FileSystemPersistenceEngine with KryoSerializer 79 81 2 0.0 79132.5 15.2X BlackHolePersistenceEngine 0 0 0 30.9 32.4 37109.8X ``` ### Does this PR introduce _any_ user-facing change? No. The default behavior is the same. ### How was this patch tested? Pass the CIs with the new added test cases. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44113 from dongjoon-hyun/SPARK-46205. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../PersistenceEngineBenchmark-jdk21-results.txt | 12 ++++++----- .../PersistenceEngineBenchmark-results.txt | 12 ++++++----- .../master/FileSystemPersistenceEngine.scala | 2 +- .../org/apache/spark/deploy/master/Master.scala | 7 +++++-- .../org/apache/spark/internal/config/Deploy.scala | 16 +++++++++++++++ .../apache/spark/deploy/master/MasterSuite.scala | 22 ++++++++++++++++++++ .../deploy/master/PersistenceEngineBenchmark.scala | 24 +++++++++++++++++++--- .../deploy/master/PersistenceEngineSuite.scala | 14 ++++++++++++- 8 files changed, 92 insertions(+), 17 deletions(-) diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt index 3312d6feff88..65dbfd0990d3 100644 --- a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt +++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt @@ -4,10 +4,12 @@ PersistenceEngineBenchmark OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor -1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -ZooKeeperPersistenceEngine 1183 1266 129 0.0 1183158.2 1.0X -FileSystemPersistenceEngine 218 222 4 0.0 218005.2 5.4X -BlackHolePersistenceEngine 0 0 0 29.5 34.0 34846.9X +1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------- +ZooKeeperPersistenceEngine with JavaSerializer 1100 1255 150 0.0 1099532.9 1.0X +ZooKeeperPersistenceEngine with KryoSerializer 946 967 20 0.0 946367.3 1.2X +FileSystemPersistenceEngine with JavaSerializer 218 223 4 0.0 217851.5 5.0X +FileSystemPersistenceEngine with KryoSerializer 79 87 12 0.0 78611.1 14.0X +BlackHolePersistenceEngine 0 0 0 42.0 23.8 46191.1X diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt b/core/benchmarks/PersistenceEngineBenchmark-results.txt index 684963f92e1f..eb63d07b4dff 100644 --- a/core/benchmarks/PersistenceEngineBenchmark-results.txt +++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt @@ -4,10 +4,12 @@ PersistenceEngineBenchmark OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor -1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -ZooKeeperPersistenceEngine 1086 1215 162 0.0 1085606.9 1.0X -FileSystemPersistenceEngine 224 225 1 0.0 223834.2 4.9X -BlackHolePersistenceEngine 0 0 0 40.7 24.6 44209.4X +1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------- +ZooKeeperPersistenceEngine with JavaSerializer 1202 1298 138 0.0 1201614.2 1.0X +ZooKeeperPersistenceEngine with KryoSerializer 951 1004 48 0.0 950559.0 1.3X +FileSystemPersistenceEngine with JavaSerializer 212 217 6 0.0 211623.2 5.7X +FileSystemPersistenceEngine with KryoSerializer 79 81 2 0.0 79132.5 15.2X +BlackHolePersistenceEngine 0 0 0 30.9 32.4 37109.8X diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index b184a71a90cd..7f624816f253 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -67,10 +67,10 @@ private[master] class FileSystemPersistenceEngine( out = serializer.newInstance().serializeStream(fileOut) out.writeObject(value) } { - fileOut.close() if (out != null) { out.close() } + fileOut.close() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 976655f029a9..0fe72e28ea5b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -39,7 +39,7 @@ import org.apache.spark.internal.config.Worker._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils} import org.apache.spark.rpc._ -import org.apache.spark.serializer.{JavaSerializer, Serializer} +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -172,7 +172,10 @@ private[deploy] class Master( masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) - val serializer = new JavaSerializer(conf) + val serializer = RecoverySerializer.withName(conf.get(RECOVERY_SERIALIZER)) match { + case RecoverySerializer.JAVA => new JavaSerializer(conf) + case RecoverySerializer.KRYO => new KryoSerializer(conf) + } val (persistenceEngine_, leaderElectionAgent_) = recoveryMode match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala index 7b35e92022ae..ae9e8b79dfe5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala @@ -17,12 +17,28 @@ package org.apache.spark.internal.config +import java.util.Locale + private[spark] object Deploy { val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode") .version("0.8.1") .stringConf .createWithDefault("NONE") + object RecoverySerializer extends Enumeration { + val JAVA, KRYO = Value + } + + val RECOVERY_SERIALIZER = ConfigBuilder("spark.deploy.recoverySerializer") + .doc("Serializer for writing/reading objects to/from persistence engines; " + + "JAVA or KRYO. Java serializer has been the default mode since Spark 0.8.1." + + "KRYO serializer is a new fast and compact mode from Spark 4.0.0.") + .version("4.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(RecoverySerializer.values.map(_.toString)) + .createWithDefault(RecoverySerializer.JAVA.toString) + val RECOVERY_MODE_FACTORY = ConfigBuilder("spark.deploy.recoveryMode.factory") .version("1.2.0") .stringConf diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 2e54673649c7..272bcad3b191 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -50,6 +50,7 @@ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.serializer +import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils object MockWorker { @@ -325,6 +326,26 @@ class MasterSuite extends SparkFunSuite } } + test("SPARK-46205: Recovery with Kryo Serializer") { + val conf = new SparkConf(loadDefaults = false) + conf.set(RECOVERY_MODE, "FILESYSTEM") + conf.set(RECOVERY_SERIALIZER, "Kryo") + conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir")) + + var master: Master = null + try { + master = makeAliveMaster(conf) + val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine] + assert(e.serializer.isInstanceOf[KryoSerializer]) + } finally { + if (master != null) { + master.rpcEnv.shutdown() + master.rpcEnv.awaitTermination() + master = null + } + } + } + test("master/worker web ui available") { implicit val formats = org.json4s.DefaultFormats val conf = new SparkConf() @@ -805,6 +826,7 @@ class MasterSuite extends SparkFunSuite private val _newDriverId = PrivateMethod[String](Symbol("newDriverId")) private val _newApplicationId = PrivateMethod[String](Symbol("newApplicationId")) private val _createApplication = PrivateMethod[ApplicationInfo](Symbol("createApplication")) + private val _persistenceEngine = PrivateMethod[PersistenceEngine](Symbol("persistenceEngine")) private val workerInfo = makeWorkerInfo(4096, 10) private val workerInfos = Array(workerInfo, workerInfo, workerInfo) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala index 9917be9b1c09..730ae05fa146 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala @@ -26,7 +26,7 @@ import org.apache.spark.SparkConf import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} -import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.Utils @@ -47,6 +47,7 @@ object PersistenceEngineBenchmark extends BenchmarkBase { val conf = new SparkConf() val serializerJava = new JavaSerializer(conf) + val serializerKryo = new KryoSerializer(conf) val zkTestServer = new TestingServer(findFreePort(conf)) override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { @@ -60,7 +61,7 @@ object PersistenceEngineBenchmark extends BenchmarkBase { runBenchmark("PersistenceEngineBenchmark") { val benchmark = new Benchmark(s"$numWorkers Workers", numWorkers, output = output) - benchmark.addCase("ZooKeeperPersistenceEngine", numIters) { _ => + benchmark.addCase("ZooKeeperPersistenceEngine with JavaSerializer", numIters) { _ => val engine = new ZooKeeperPersistenceEngine(conf, serializerJava) workers.foreach(engine.addWorker) engine.read[WorkerInfo]("worker_") @@ -68,7 +69,15 @@ object PersistenceEngineBenchmark extends BenchmarkBase { engine.close() } - benchmark.addCase("FileSystemPersistenceEngine", numIters) { _ => + benchmark.addCase("ZooKeeperPersistenceEngine with KryoSerializer", numIters) { _ => + val engine = new ZooKeeperPersistenceEngine(conf, serializerKryo) + workers.foreach(engine.addWorker) + engine.read[WorkerInfo]("worker_") + workers.foreach(engine.removeWorker) + engine.close() + } + + benchmark.addCase("FileSystemPersistenceEngine with JavaSerializer", numIters) { _ => val dir = Utils.createTempDir().getAbsolutePath val engine = new FileSystemPersistenceEngine(dir, serializerJava) workers.foreach(engine.addWorker) @@ -77,6 +86,15 @@ object PersistenceEngineBenchmark extends BenchmarkBase { engine.close() } + benchmark.addCase("FileSystemPersistenceEngine with KryoSerializer", numIters) { _ => + val dir = Utils.createTempDir().getAbsolutePath + val engine = new FileSystemPersistenceEngine(dir, serializerKryo) + workers.foreach(engine.addWorker) + engine.read[WorkerInfo]("worker_") + workers.foreach(engine.removeWorker) + engine.close() + } + benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ => val engine = new BlackHolePersistenceEngine() workers.foreach(engine.addWorker) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 68efdddc0873..66a61d80d2a9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -26,7 +26,7 @@ import org.apache.curator.test.TestingServer import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL import org.apache.spark.rpc.{RpcEndpoint, RpcEnv} -import org.apache.spark.serializer.{JavaSerializer, Serializer} +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} import org.apache.spark.util.Utils class PersistenceEngineSuite extends SparkFunSuite { @@ -53,6 +53,18 @@ class PersistenceEngineSuite extends SparkFunSuite { } } + test("SPARK-46205: Support KryoSerializer in FileSystemPersistenceEngine") { + withTempDir { dir => + val conf = new SparkConf() + val serializer = new KryoSerializer(conf) + val engine = new FileSystemPersistenceEngine(dir.getAbsolutePath, serializer) + engine.persist("test_1", "test_1_value") + engine.read[String]("test_1") + engine.unpersist("test_1") + engine.close() + } + } + test("ZooKeeperPersistenceEngine") { val conf = new SparkConf() // TestingServer logs the port conflict exception rather than throwing an exception. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org