This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3da2e5c6324 [SPARK-46216][CORE] Improve `FileSystemPersistenceEngine` to support compressions 3da2e5c6324 is described below commit 3da2e5c632468ec7cf7001255c1a44197b46ce30 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Sun Dec 3 00:26:16 2023 -0800 [SPARK-46216][CORE] Improve `FileSystemPersistenceEngine` to support compressions ### What changes were proposed in this pull request? This PR aims to improve `FileSystemPersistenceEngine` to support compressions via a new configuration, `spark.deploy.recoveryCompressionCodec`. ### Why are the changes needed? To allow the users to choose a proper compression codec for their workloads. For `JavaSerializer` case, `LZ4` compression is **2x** faster than the baseline (no compression). ``` OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor 2000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------------- ZooKeeperPersistenceEngine with JavaSerializer 2276 2360 115 0.0 1137909.6 1.0X ZooKeeperPersistenceEngine with KryoSerializer 1883 1906 34 0.0 941364.2 1.2X FileSystemPersistenceEngine with JavaSerializer 431 436 7 0.0 215436.9 5.3X FileSystemPersistenceEngine with JavaSerializer (lz4) 209 216 9 0.0 104404.1 10.9X FileSystemPersistenceEngine with JavaSerializer (lzf) 199 202 2 0.0 99489.5 11.4X FileSystemPersistenceEngine with JavaSerializer (snappy) 192 199 9 0.0 95872.9 11.9X FileSystemPersistenceEngine with JavaSerializer (zstd) 258 264 6 0.0 129249.4 8.8X FileSystemPersistenceEngine with KryoSerializer 139 151 13 0.0 69374.5 16.4X FileSystemPersistenceEngine with KryoSerializer (lz4) 159 165 8 0.0 79588.9 14.3X FileSystemPersistenceEngine with KryoSerializer (lzf) 180 195 18 0.0 89844.0 12.7X FileSystemPersistenceEngine with KryoSerializer (snappy) 164 183 18 0.0 82016.0 13.9X FileSystemPersistenceEngine with KryoSerializer (zstd) 206 218 11 0.0 102838.9 11.1X BlackHolePersistenceEngine 0 0 0 35.1 28.5 39908.5X ``` ### Does this PR introduce _any_ user-facing change? No, this is a new feature. ### How was this patch tested? Pass the CIs with the newly added test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44129 from dongjoon-hyun/SPARK-46216. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../PersistenceEngineBenchmark-jdk21-results.txt | 22 ++++++++---- .../PersistenceEngineBenchmark-results.txt | 22 ++++++++---- .../master/FileSystemPersistenceEngine.scala | 10 ++++-- .../spark/deploy/master/RecoveryModeFactory.scala | 6 ++-- .../org/apache/spark/internal/config/Deploy.scala | 7 ++++ .../apache/spark/deploy/master/MasterSuite.scala | 40 ++++++++++++++++++++++ .../deploy/master/PersistenceEngineBenchmark.scala | 19 ++++++++-- .../deploy/master/PersistenceEngineSuite.scala | 13 +++++++ 8 files changed, 118 insertions(+), 21 deletions(-) diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt index 65dbfd0990d..38e74ed6b53 100644 --- a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt +++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt @@ -4,12 +4,20 @@ PersistenceEngineBenchmark OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor -1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -ZooKeeperPersistenceEngine with JavaSerializer 1100 1255 150 0.0 1099532.9 1.0X -ZooKeeperPersistenceEngine with KryoSerializer 946 967 20 0.0 946367.3 1.2X -FileSystemPersistenceEngine with JavaSerializer 218 223 4 0.0 217851.5 5.0X -FileSystemPersistenceEngine with KryoSerializer 79 87 12 0.0 78611.1 14.0X -BlackHolePersistenceEngine 0 0 0 42.0 23.8 46191.1X +2000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------------- +ZooKeeperPersistenceEngine with JavaSerializer 2254 2329 119 0.0 1126867.1 1.0X +ZooKeeperPersistenceEngine with KryoSerializer 1911 1912 1 0.0 955667.1 1.2X +FileSystemPersistenceEngine with JavaSerializer 438 448 15 0.0 218868.1 5.1X +FileSystemPersistenceEngine with JavaSerializer (lz4) 187 195 8 0.0 93337.8 12.1X +FileSystemPersistenceEngine with JavaSerializer (lzf) 193 216 20 0.0 96678.8 11.7X +FileSystemPersistenceEngine with JavaSerializer (snappy) 175 183 10 0.0 87652.3 12.9X +FileSystemPersistenceEngine with JavaSerializer (zstd) 243 255 14 0.0 121695.2 9.3X +FileSystemPersistenceEngine with KryoSerializer 150 160 15 0.0 75089.7 15.0X +FileSystemPersistenceEngine with KryoSerializer (lz4) 170 177 10 0.0 84996.7 13.3X +FileSystemPersistenceEngine with KryoSerializer (lzf) 192 203 12 0.0 96019.1 11.7X +FileSystemPersistenceEngine with KryoSerializer (snappy) 184 202 16 0.0 92241.3 12.2X +FileSystemPersistenceEngine with KryoSerializer (zstd) 232 238 5 0.0 116075.2 9.7X +BlackHolePersistenceEngine 0 0 0 27.3 36.6 30761.0X diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt b/core/benchmarks/PersistenceEngineBenchmark-results.txt index eb63d07b4df..fc743da46a8 100644 --- a/core/benchmarks/PersistenceEngineBenchmark-results.txt +++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt @@ -4,12 +4,20 @@ PersistenceEngineBenchmark OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor -1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -ZooKeeperPersistenceEngine with JavaSerializer 1202 1298 138 0.0 1201614.2 1.0X -ZooKeeperPersistenceEngine with KryoSerializer 951 1004 48 0.0 950559.0 1.3X -FileSystemPersistenceEngine with JavaSerializer 212 217 6 0.0 211623.2 5.7X -FileSystemPersistenceEngine with KryoSerializer 79 81 2 0.0 79132.5 15.2X -BlackHolePersistenceEngine 0 0 0 30.9 32.4 37109.8X +2000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------------- +ZooKeeperPersistenceEngine with JavaSerializer 2276 2360 115 0.0 1137909.6 1.0X +ZooKeeperPersistenceEngine with KryoSerializer 1883 1906 34 0.0 941364.2 1.2X +FileSystemPersistenceEngine with JavaSerializer 431 436 7 0.0 215436.9 5.3X +FileSystemPersistenceEngine with JavaSerializer (lz4) 209 216 9 0.0 104404.1 10.9X +FileSystemPersistenceEngine with JavaSerializer (lzf) 199 202 2 0.0 99489.5 11.4X +FileSystemPersistenceEngine with JavaSerializer (snappy) 192 199 9 0.0 95872.9 11.9X +FileSystemPersistenceEngine with JavaSerializer (zstd) 258 264 6 0.0 129249.4 8.8X +FileSystemPersistenceEngine with KryoSerializer 139 151 13 0.0 69374.5 16.4X +FileSystemPersistenceEngine with KryoSerializer (lz4) 159 165 8 0.0 79588.9 14.3X +FileSystemPersistenceEngine with KryoSerializer (lzf) 180 195 18 0.0 89844.0 12.7X +FileSystemPersistenceEngine with KryoSerializer (snappy) 164 183 18 0.0 82016.0 13.9X +FileSystemPersistenceEngine with KryoSerializer (zstd) 206 218 11 0.0 102838.9 11.1X +BlackHolePersistenceEngine 0 0 0 35.1 28.5 39908.5X diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 094136ea274..785367a0dee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -23,6 +23,7 @@ import java.nio.file.{Files, Paths} import scala.reflect.ClassTag import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -37,7 +38,8 @@ import org.apache.spark.util.Utils */ private[master] class FileSystemPersistenceEngine( val dir: String, - val serializer: Serializer) + val serializer: Serializer, + val codec: Option[CompressionCodec] = None) extends PersistenceEngine with Logging { Files.createDirectories(Paths.get(dir)) @@ -62,7 +64,8 @@ private[master] class FileSystemPersistenceEngine( if (file.exists()) { throw new IllegalStateException("File already exists: " + file) } val created = file.createNewFile() if (!created) { throw new IllegalStateException("Could not create file: " + file) } - val fileOut = new FileOutputStream(file) + var fileOut: OutputStream = new FileOutputStream(file) + codec.foreach { c => fileOut = c.compressedOutputStream(fileOut) } var out: SerializationStream = null Utils.tryWithSafeFinally { out = serializer.newInstance().serializeStream(fileOut) @@ -76,7 +79,8 @@ private[master] class FileSystemPersistenceEngine( } private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = { - val fileIn = new FileInputStream(file) + var fileIn: InputStream = new FileInputStream(file) + codec.foreach { c => fileIn = c.compressedInputStream(new FileInputStream(file)) } var in: DeserializationStream = null try { in = serializer.newInstance().deserializeStream(fileIn) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala index 470798793ce..c24c4e5fe6b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala @@ -20,7 +20,8 @@ package org.apache.spark.deploy.master import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.Deploy.RECOVERY_DIRECTORY +import org.apache.spark.internal.config.Deploy.{RECOVERY_COMPRESSION_CODEC, RECOVERY_DIRECTORY} +import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.Serializer /** @@ -57,7 +58,8 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: def createPersistenceEngine(): PersistenceEngine = { logInfo("Persisting recovery state to directory: " + recoveryDir) - new FileSystemPersistenceEngine(recoveryDir, serializer) + val codec = conf.get(RECOVERY_COMPRESSION_CODEC).map(c => CompressionCodec.createCodec(conf, c)) + new FileSystemPersistenceEngine(recoveryDir, serializer, codec) } def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala index ae9e8b79dfe..b52ea356789 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala @@ -39,6 +39,13 @@ private[spark] object Deploy { .checkValues(RecoverySerializer.values.map(_.toString)) .createWithDefault(RecoverySerializer.JAVA.toString) + val RECOVERY_COMPRESSION_CODEC = ConfigBuilder("spark.deploy.recoveryCompressionCodec") + .doc("A compression codec for persistence engines. none (default), lz4, lzf, snappy, and " + + "zstd. Currently, only FILESYSTEM mode supports this configuration.") + .version("4.0.0") + .stringConf + .createOptional + val RECOVERY_MODE_FACTORY = ConfigBuilder("spark.deploy.recoveryMode.factory") .version("1.2.0") .stringConf diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 272bcad3b19..81756d807b2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -45,6 +45,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Deploy._ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ +import org.apache.spark.io.LZ4CompressionCodec import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceRequirement} import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} @@ -346,6 +347,45 @@ class MasterSuite extends SparkFunSuite } } + test("SPARK-46216: Recovery without compression") { + val conf = new SparkConf(loadDefaults = false) + conf.set(RECOVERY_MODE, "FILESYSTEM") + conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir")) + + var master: Master = null + try { + master = makeAliveMaster(conf) + val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine] + assert(e.codec.isEmpty) + } finally { + if (master != null) { + master.rpcEnv.shutdown() + master.rpcEnv.awaitTermination() + master = null + } + } + } + + test("SPARK-46216: Recovery with compression") { + val conf = new SparkConf(loadDefaults = false) + conf.set(RECOVERY_MODE, "FILESYSTEM") + conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir")) + conf.set(RECOVERY_COMPRESSION_CODEC, "lz4") + + var master: Master = null + try { + master = makeAliveMaster(conf) + val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine] + assert(e.codec.get.isInstanceOf[LZ4CompressionCodec]) + } finally { + if (master != null) { + master.rpcEnv.shutdown() + master.rpcEnv.awaitTermination() + master = null + } + } + } + test("master/worker web ui available") { implicit val formats = org.json4s.DefaultFormats val conf = new SparkConf() diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala index 2f8e9a8eff2..a1df75b846d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala @@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer import org.apache.spark.SparkConf import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL +import org.apache.spark.io.CompressionCodec import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.Utils @@ -52,7 +53,7 @@ object PersistenceEngineBenchmark extends BenchmarkBase { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val numIters = 3 - val numWorkers = 1000 + val numWorkers = 2000 val workers = (1 to numWorkers).map(createWorkerInfo).toArray conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString) @@ -73,7 +74,8 @@ object PersistenceEngineBenchmark extends BenchmarkBase { serializers.foreach { serializer => val serializerName = serializer.getClass.getSimpleName - benchmark.addCase(s"FileSystemPersistenceEngine with $serializerName", numIters) { _ => + val name = s"FileSystemPersistenceEngine with $serializerName" + benchmark.addCase(name, numIters) { _ => val dir = Utils.createTempDir().getAbsolutePath val engine = new FileSystemPersistenceEngine(dir, serializer) workers.foreach(engine.addWorker) @@ -81,6 +83,19 @@ object PersistenceEngineBenchmark extends BenchmarkBase { workers.foreach(engine.removeWorker) engine.close() } + CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c => + val codec = CompressionCodec.createCodec(conf, c) + val shortCodecName = CompressionCodec.getShortName(c) + val name = s"FileSystemPersistenceEngine with $serializerName ($shortCodecName)" + benchmark.addCase(name, numIters) { _ => + val dir = Utils.createTempDir().getAbsolutePath + val engine = new FileSystemPersistenceEngine(dir, serializer, Some(codec)) + workers.foreach(engine.addWorker) + engine.read[WorkerInfo]("worker_") + workers.foreach(engine.removeWorker) + engine.close() + } + } } benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ => diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 8546f4e01f3..c02b84fc1a6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL +import org.apache.spark.io.CompressionCodec import org.apache.spark.rpc.{RpcEndpoint, RpcEnv} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} import org.apache.spark.util.Utils @@ -74,6 +75,18 @@ class PersistenceEngineSuite extends SparkFunSuite { } } + test("SPARK-46216: FileSystemPersistenceEngine with compression") { + val conf = new SparkConf() + CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c => + val codec = CompressionCodec.createCodec(conf, c) + withTempDir { dir => + testPersistenceEngine(conf, serializer => + new FileSystemPersistenceEngine(dir.getAbsolutePath, serializer, Some(codec)) + ) + } + } + } + test("ZooKeeperPersistenceEngine") { val conf = new SparkConf() // TestingServer logs the port conflict exception rather than throwing an exception. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org