This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0e689611f099 [SPARK-46193][CORE][TESTS] Add `PersistenceEngineBenchmark` 0e689611f099 is described below commit 0e689611f09968c3a46689294184de29d097302b Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Fri Dec 1 02:52:29 2023 -0800 [SPARK-46193][CORE][TESTS] Add `PersistenceEngineBenchmark` ### What changes were proposed in this pull request? This PR aims to provide a new benchmark, `PersistenceEngineBenchmark`. ### Why are the changes needed? This is beneficial for both the developers and the users by providing a consistent measurement. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. ``` $ build/sbt "core/Test/runMain org.apache.spark.deploy.master.PersistenceEngineBenchmark" ... [info] OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Mac OS X 14.2 [info] Apple M1 Max [info] 1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] ZooKeeperPersistenceEngine 11179 11198 20 0.0 11179348.5 1.0X [info] FileSystemPersistenceEngine 416 422 6 0.0 415745.2 26.9X [info] BlackHolePersistenceEngine 0 0 0 22.7 44.1 253597.7X ``` ``` $ bin/spark-submit --driver-memory 6g --class org.apache.spark.deploy.master.PersistenceEngineBenchmark --jars `find ~/Library/Caches/Coursier/v1 -name 'curator-test-*.jar'` core/target/scala-2.13/spark-core_2.13-4.0.0-SNAPSHOT-tests.jar ... OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Mac OS X 14.2 Apple M1 Max 1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ ZooKeeperPersistenceEngine 11565 11857 373 0.0 11564757.8 1.0X FileSystemPersistenceEngine 426 426 1 0.0 425605.0 27.2X BlackHolePersistenceEngine 0 0 0 27.4 36.5 316478.5X ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44102 from dongjoon-hyun/SPARK-46193. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .github/workflows/benchmark.yml | 2 +- .../PersistenceEngineBenchmark-jdk21-results.txt | 13 +++ .../PersistenceEngineBenchmark-results.txt | 13 +++ .../deploy/master/PersistenceEngineBenchmark.scala | 114 +++++++++++++++++++++ 4 files changed, 141 insertions(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 8e7551fa7738..3cb63404bcac 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -177,7 +177,7 @@ jobs: # In benchmark, we use local as master so set driver memory only. Note that GitHub Actions has 7 GB memory limit. bin/spark-submit \ --driver-memory 6g --class org.apache.spark.benchmark.Benchmarks \ - --jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + --jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`,`find ~/.cache/coursier -name 'curator-test-*.jar'`" \ "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ "${{ github.event.inputs.class }}" # To keep the directory structure and file permissions, tar them diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt new file mode 100644 index 000000000000..3312d6feff88 --- /dev/null +++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt @@ -0,0 +1,13 @@ +================================================================================================ +PersistenceEngineBenchmark +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure +AMD EPYC 7763 64-Core Processor +1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +ZooKeeperPersistenceEngine 1183 1266 129 0.0 1183158.2 1.0X +FileSystemPersistenceEngine 218 222 4 0.0 218005.2 5.4X +BlackHolePersistenceEngine 0 0 0 29.5 34.0 34846.9X + + diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt b/core/benchmarks/PersistenceEngineBenchmark-results.txt new file mode 100644 index 000000000000..684963f92e1f --- /dev/null +++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt @@ -0,0 +1,13 @@ +================================================================================================ +PersistenceEngineBenchmark +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure +AMD EPYC 7763 64-Core Processor +1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +ZooKeeperPersistenceEngine 1086 1215 162 0.0 1085606.9 1.0X +FileSystemPersistenceEngine 224 225 1 0.0 223834.2 4.9X +BlackHolePersistenceEngine 0 0 0 40.7 24.6 44209.4X + + diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala new file mode 100644 index 000000000000..9917be9b1c09 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.net.ServerSocket +import java.util.concurrent.ThreadLocalRandom + +import org.apache.curator.test.TestingServer + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL +import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.Utils + + +/** + * Benchmark for PersistenceEngines. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> --jars `find ~/.cache/coursier \ + * -name 'curator-test-*.jar'` <spark core test jar> + * 2. build/sbt "core/Test/runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>" + * Results will be written to "benchmarks/PersistenceEngineBenchmark-results.txt". + * }}} + * */ +object PersistenceEngineBenchmark extends BenchmarkBase { + + val conf = new SparkConf() + val serializerJava = new JavaSerializer(conf) + val zkTestServer = new TestingServer(findFreePort(conf)) + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + + val numIters = 3 + val numWorkers = 1000 + val workers = (1 to numWorkers).map(createWorkerInfo).toArray + + conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString) + + runBenchmark("PersistenceEngineBenchmark") { + val benchmark = new Benchmark(s"$numWorkers Workers", numWorkers, output = output) + + benchmark.addCase("ZooKeeperPersistenceEngine", numIters) { _ => + val engine = new ZooKeeperPersistenceEngine(conf, serializerJava) + workers.foreach(engine.addWorker) + engine.read[WorkerInfo]("worker_") + workers.foreach(engine.removeWorker) + engine.close() + } + + benchmark.addCase("FileSystemPersistenceEngine", numIters) { _ => + val dir = Utils.createTempDir().getAbsolutePath + val engine = new FileSystemPersistenceEngine(dir, serializerJava) + workers.foreach(engine.addWorker) + engine.read[WorkerInfo]("worker_") + workers.foreach(engine.removeWorker) + engine.close() + } + + benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ => + val engine = new BlackHolePersistenceEngine() + workers.foreach(engine.addWorker) + engine.read[WorkerInfo]("worker_") + workers.foreach(engine.removeWorker) + engine.close() + } + + benchmark.run() + } + } + + override def afterAll(): Unit = { + zkTestServer.stop() + } + + private def createWorkerInfo(id: Int): WorkerInfo = { + val gpuResource = new WorkerResourceInfo(GPU, Seq("0", "1", "2")) + val fpgaResource = new WorkerResourceInfo(FPGA, Seq("3", "4", "5")) + val resources = Map(GPU -> gpuResource, FPGA -> fpgaResource) + val workerInfo = new WorkerInfo(s"worker-20231201000000-255.255.255.255-$id", "host", 8080, 4, + 1234, null, "http://publicAddress:80", resources) + workerInfo.lastHeartbeat = System.currentTimeMillis() + workerInfo + } + + def findFreePort(conf: SparkConf): Int = { + val candidatePort = ThreadLocalRandom.current().nextInt(1024, 65536) + Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) + }, conf)._2 + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org