This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dd11075db618 [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` dd11075db618 is described below commit dd11075db61879e200b4121b83d4239954881ddd Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Tue Dec 5 00:49:21 2023 -0800 [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` ### What changes were proposed in this pull request? This PR aims to add `RocksDBPersistenceEngine`. ### Why are the changes needed? To speed up `Spark Master` HA operations by **6.1x**. ``` OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor 1000 Workers: Best Time(ms) Avg Time(ms) Relative -------------------------------------------------------------------------------------------- FileSystemPersistenceEngine with JavaSerializer 1571 1616 3.6X RocksDBPersistenceEngine with JavaSerializer 257 258 22.0X ``` ### Does this PR introduce _any_ user-facing change? No. This is a new backend. ### How was this patch tested? Pass the CIs with the newly added test cases. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44173 from dongjoon-hyun/SPARK-46258. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../PersistenceEngineBenchmark-jdk21-results.txt | 28 +++--- .../PersistenceEngineBenchmark-results.txt | 28 +++--- .../org/apache/spark/deploy/master/Master.scala | 4 + .../spark/deploy/master/RecoveryModeFactory.scala | 18 ++++ .../deploy/master/RocksDBPersistenceEngine.scala | 103 +++++++++++++++++++++ .../apache/spark/deploy/master/MasterSuite.scala | 20 ++++ .../deploy/master/PersistenceEngineBenchmark.scala | 11 +++ .../deploy/master/PersistenceEngineSuite.scala | 9 ++ 8 files changed, 195 insertions(+), 26 deletions(-) diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt index 314fb6958b69..99035eb336a3 100644 --- a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt +++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt @@ -6,18 +6,20 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor 1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------------- -ZooKeeperPersistenceEngine with JavaSerializer 5402 5546 233 0.0 5402030.8 1.0X -ZooKeeperPersistenceEngine with KryoSerializer 4185 4220 32 0.0 4184623.1 1.3X -FileSystemPersistenceEngine with JavaSerializer 1591 1634 37 0.0 1590836.4 3.4X -FileSystemPersistenceEngine with JavaSerializer (lz4) 611 623 14 0.0 611256.6 8.8X -FileSystemPersistenceEngine with JavaSerializer (lzf) 626 640 13 0.0 626072.2 8.6X -FileSystemPersistenceEngine with JavaSerializer (snappy) 595 628 29 0.0 594744.4 9.1X -FileSystemPersistenceEngine with JavaSerializer (zstd) 755 774 21 0.0 754604.4 7.2X -FileSystemPersistenceEngine with KryoSerializer 479 489 8 0.0 479404.7 11.3X -FileSystemPersistenceEngine with KryoSerializer (lz4) 392 406 12 0.0 392165.7 13.8X -FileSystemPersistenceEngine with KryoSerializer (lzf) 525 536 14 0.0 524916.7 10.3X -FileSystemPersistenceEngine with KryoSerializer (snappy) 519 533 14 0.0 518569.3 10.4X -FileSystemPersistenceEngine with KryoSerializer (zstd) 627 663 31 0.0 627233.2 8.6X -BlackHolePersistenceEngine 0 0 0 6.0 166.0 32541.8X +ZooKeeperPersistenceEngine with JavaSerializer 5863 6053 265 0.0 5862988.1 1.0X +ZooKeeperPersistenceEngine with KryoSerializer 4553 4612 54 0.0 4553477.9 1.3X +FileSystemPersistenceEngine with JavaSerializer 1619 1632 17 0.0 1618500.5 3.6X +FileSystemPersistenceEngine with JavaSerializer (lz4) 619 631 10 0.0 619255.8 9.5X +FileSystemPersistenceEngine with JavaSerializer (lzf) 623 640 20 0.0 623222.4 9.4X +FileSystemPersistenceEngine with JavaSerializer (snappy) 553 596 37 0.0 553417.4 10.6X +FileSystemPersistenceEngine with JavaSerializer (zstd) 747 767 26 0.0 747197.1 7.8X +FileSystemPersistenceEngine with KryoSerializer 394 460 57 0.0 393534.6 14.9X +FileSystemPersistenceEngine with KryoSerializer (lz4) 368 406 33 0.0 367925.8 15.9X +FileSystemPersistenceEngine with KryoSerializer (lzf) 509 532 35 0.0 509170.2 11.5X +FileSystemPersistenceEngine with KryoSerializer (snappy) 515 540 28 0.0 515190.9 11.4X +FileSystemPersistenceEngine with KryoSerializer (zstd) 632 656 32 0.0 631522.4 9.3X +RocksDBPersistenceEngine with JavaSerializer 265 266 1 0.0 265026.0 22.1X +RocksDBPersistenceEngine with KryoSerializer 93 94 2 0.0 92732.3 63.2X +BlackHolePersistenceEngine 0 0 0 6.0 166.8 35151.7X diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt b/core/benchmarks/PersistenceEngineBenchmark-results.txt index 64c8bb440842..6c33de480b1c 100644 --- a/core/benchmarks/PersistenceEngineBenchmark-results.txt +++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt @@ -6,18 +6,20 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor 1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------------- -ZooKeeperPersistenceEngine with JavaSerializer 5463 5745 246 0.0 5463433.7 1.0X -ZooKeeperPersistenceEngine with KryoSerializer 4478 4513 31 0.0 4477926.8 1.2X -FileSystemPersistenceEngine with JavaSerializer 1650 1670 19 0.0 1649557.9 3.3X -FileSystemPersistenceEngine with JavaSerializer (lz4) 628 649 19 0.0 628265.9 8.7X -FileSystemPersistenceEngine with JavaSerializer (lzf) 545 575 28 0.0 544502.0 10.0X -FileSystemPersistenceEngine with JavaSerializer (snappy) 608 625 27 0.0 607926.2 9.0X -FileSystemPersistenceEngine with JavaSerializer (zstd) 805 814 8 0.0 804867.1 6.8X -FileSystemPersistenceEngine with KryoSerializer 420 461 40 0.0 420137.2 13.0X -FileSystemPersistenceEngine with KryoSerializer (lz4) 380 452 64 0.0 379742.4 14.4X -FileSystemPersistenceEngine with KryoSerializer (lzf) 518 543 31 0.0 518217.1 10.5X -FileSystemPersistenceEngine with KryoSerializer (snappy) 483 513 31 0.0 483112.6 11.3X -FileSystemPersistenceEngine with KryoSerializer (zstd) 668 679 9 0.0 668335.1 8.2X -BlackHolePersistenceEngine 0 0 0 5.8 172.5 31668.2X +ZooKeeperPersistenceEngine with JavaSerializer 6123 6309 172 0.0 6122561.9 1.0X +ZooKeeperPersistenceEngine with KryoSerializer 4676 4753 71 0.0 4675978.2 1.3X +FileSystemPersistenceEngine with JavaSerializer 1657 1679 20 0.0 1656526.3 3.7X +FileSystemPersistenceEngine with JavaSerializer (lz4) 641 657 18 0.0 641219.3 9.5X +FileSystemPersistenceEngine with JavaSerializer (lzf) 610 613 4 0.0 609684.2 10.0X +FileSystemPersistenceEngine with JavaSerializer (snappy) 615 641 23 0.0 615266.4 10.0X +FileSystemPersistenceEngine with JavaSerializer (zstd) 749 764 17 0.0 749140.8 8.2X +FileSystemPersistenceEngine with KryoSerializer 460 477 15 0.0 460196.8 13.3X +FileSystemPersistenceEngine with KryoSerializer (lz4) 403 439 32 0.0 402877.4 15.2X +FileSystemPersistenceEngine with KryoSerializer (lzf) 543 571 49 0.0 542685.4 11.3X +FileSystemPersistenceEngine with KryoSerializer (snappy) 498 507 16 0.0 497754.2 12.3X +FileSystemPersistenceEngine with KryoSerializer (zstd) 644 653 8 0.0 643776.5 9.5X +RocksDBPersistenceEngine with JavaSerializer 279 281 2 0.0 278935.2 21.9X +RocksDBPersistenceEngine with KryoSerializer 92 92 1 0.0 91713.1 66.8X +BlackHolePersistenceEngine 0 0 0 6.0 165.6 36965.5X diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 2e1d7b9bce33..e2c652f944c9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -186,6 +186,10 @@ private[deploy] class Master( val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) + case "ROCKSDB" => + val rdbFactory = + new RocksDBRecoveryModeFactory(conf, serializer) + (rdbFactory.createPersistenceEngine(), rdbFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Utils.classForName(conf.get(RECOVERY_MODE_FACTORY)) val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala index c24c4e5fe6be..106acc9a7944 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala @@ -67,6 +67,24 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: } } +/** + * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual + * recovery is made by restoring from RocksDB. + */ +private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer) + extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { + + def createPersistenceEngine(): PersistenceEngine = { + val recoveryDir = conf.get(RECOVERY_DIRECTORY) + logInfo("Persisting recovery state to directory: " + recoveryDir) + new RocksDBPersistenceEngine(recoveryDir, serializer) + } + + def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = { + new MonarchyLeaderAgent(master) + } +} + private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer) extends StandaloneRecoveryModeFactory(conf, serializer) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala new file mode 100644 index 000000000000..5c43dab4d066 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.Serializer + + +/** + * Stores data in RocksDB. + * + * @param dir Directory to setup RocksDB. Created if non-existent. + * @param serializer Used to serialize our objects. + */ +private[master] class RocksDBPersistenceEngine( + val dir: String, + val serializer: Serializer) + extends PersistenceEngine with Logging { + + RocksDB.loadLibrary() + + private val path = Files.createDirectories(Paths.get(dir)) + + /** + * Use full filter. + * Disable compression in index data. + * + * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format + */ + private val tableFormatConfig = new BlockBasedTableConfig() + .setFilterPolicy(new BloomFilter(10.0D, false)) + .setEnableIndexCompression(false) + .setIndexBlockRestartInterval(8) + .setFormatVersion(5) + + /** + * Use ZSTD at the bottom most level to reduce the disk space + * Use LZ4 at the other levels because it's better than Snappy in general. + * + * https://github.com/facebook/rocksdb/wiki/Compression#configuration + */ + private val options = new Options() + .setCreateIfMissing(true) + .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION) + .setCompressionType(CompressionType.LZ4_COMPRESSION) + .setTableFormatConfig(tableFormatConfig) + + private val db: RocksDB = RocksDB.open(options, path.toString) + + override def persist(name: String, obj: Object): Unit = { + val serialized = serializer.newInstance().serialize(obj) + if (serialized.hasArray) { + db.put(name.getBytes(UTF_8), serialized.array()) + } else { + val bytes = new Array[Byte](serialized.remaining()) + serialized.get(bytes) + db.put(name.getBytes(UTF_8), bytes) + } + } + + override def unpersist(name: String): Unit = { + db.delete(name.getBytes(UTF_8)) + } + + override def read[T: ClassTag](name: String): Seq[T] = { + val result = new ArrayBuffer[T] + val iter = db.newIterator() + try { + iter.seek(name.getBytes(UTF_8)) + while (iter.isValid && new String(iter.key()).startsWith(name)) { + result.append(serializer.newInstance().deserialize[T](ByteBuffer.wrap(iter.value()))) + iter.next() + } + } finally { + iter.close() + } + result.toSeq + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 81756d807b2e..9fd1991dab02 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -386,6 +386,26 @@ class MasterSuite extends SparkFunSuite } } + test("SPARK-46258: Recovery with RocksDB") { + val conf = new SparkConf(loadDefaults = false) + conf.set(RECOVERY_MODE, "ROCKSDB") + conf.set(RECOVERY_SERIALIZER, "Kryo") + conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir")) + + var master: Master = null + try { + master = makeAliveMaster(conf) + val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[RocksDBPersistenceEngine] + assert(e.serializer.isInstanceOf[KryoSerializer]) + } finally { + if (master != null) { + master.rpcEnv.shutdown() + master.rpcEnv.awaitTermination() + master = null + } + } + } + test("master/worker web ui available") { implicit val formats = org.json4s.DefaultFormats val conf = new SparkConf() diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala index f538a1a06f6d..34a447efe528 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala @@ -110,6 +110,17 @@ object PersistenceEngineBenchmark extends BenchmarkBase { } } + serializers.foreach { serializer => + val serializerName = serializer.getClass.getSimpleName + val name = s"RocksDBPersistenceEngine with $serializerName" + benchmark.addCase(name, numIters) { _ => + val dir = Utils.createTempDir().getAbsolutePath + val engine = new RocksDBPersistenceEngine(dir, serializer) + writeAndRead(engine) + engine.close() + } + } + benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ => val engine = new BlackHolePersistenceEngine() writeAndRead(engine) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index c02b84fc1a66..84181ea3fca3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -41,6 +41,15 @@ class PersistenceEngineSuite extends SparkFunSuite { } } + test("SPARK-46258: RocksDBPersistenceEngine") { + withTempDir { dir => + val conf = new SparkConf() + testPersistenceEngine(conf, serializer => + new RocksDBPersistenceEngine(dir.getAbsolutePath, serializer) + ) + } + } + test("SPARK-46191: FileSystemPersistenceEngine.persist error message for the existing file") { withTempDir { dir => val conf = new SparkConf() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org