This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5321353b24db [SPARK-47875][CORE] Remove 
`spark.deploy.recoverySerializer`
5321353b24db is described below

commit 5321353b24db247087890c44de06b9ad4e136473
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Tue Apr 16 16:47:23 2024 -0700

    [SPARK-47875][CORE] Remove `spark.deploy.recoverySerializer`
    
    ### What changes were proposed in this pull request?
    
    This is a logical revert of SPARK-46205
    - #44113
    - #44118
    
    ### Why are the changes needed?
    
    The initial implementation didn't handle the class initialization logic 
properly.
    Until we have a fix, I'd like to revert this from `master` branch.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is not released yet.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46087 from dongjoon-hyun/SPARK-47875.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../PersistenceEngineBenchmark-jdk21-results.txt   |  7 ------
 .../PersistenceEngineBenchmark-results.txt         |  7 ------
 .../org/apache/spark/deploy/master/Master.scala    |  7 ++----
 .../org/apache/spark/internal/config/Deploy.scala  | 14 ------------
 .../deploy/master/PersistenceEngineBenchmark.scala |  4 ++--
 .../deploy/master/PersistenceEngineSuite.scala     | 14 +-----------
 .../apache/spark/deploy/master/RecoverySuite.scala | 25 ++--------------------
 docs/spark-standalone.md                           | 12 ++---------
 8 files changed, 9 insertions(+), 81 deletions(-)

diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt 
b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
index 2a6bd778fc8a..ae4e0071adb0 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
@@ -7,19 +7,12 @@ AMD EPYC 7763 64-Core Processor
 1000 Workers:                                             Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
----------------------------------------------------------------------------------------------------------------------------------------
 ZooKeeperPersistenceEngine with JavaSerializer                     5036        
   5232         229          0.0     5035730.1       1.0X
-ZooKeeperPersistenceEngine with KryoSerializer                     4038        
   4053          16          0.0     4038447.8       1.2X
 FileSystemPersistenceEngine with JavaSerializer                    2902        
   2906           5          0.0     2902453.3       1.7X
 FileSystemPersistenceEngine with JavaSerializer (lz4)               816        
    829          19          0.0      816173.1       6.2X
 FileSystemPersistenceEngine with JavaSerializer (lzf)               755        
    780          33          0.0      755209.0       6.7X
 FileSystemPersistenceEngine with JavaSerializer (snappy)            814        
    832          16          0.0      813672.5       6.2X
 FileSystemPersistenceEngine with JavaSerializer (zstd)              987        
   1014          45          0.0      986834.7       5.1X
-FileSystemPersistenceEngine with KryoSerializer                     687        
    698          14          0.0      687313.5       7.3X
-FileSystemPersistenceEngine with KryoSerializer (lz4)               590        
    599          15          0.0      589867.9       8.5X
-FileSystemPersistenceEngine with KryoSerializer (lzf)               915        
    922           9          0.0      915432.2       5.5X
-FileSystemPersistenceEngine with KryoSerializer (snappy)            768        
    795          37          0.0      768494.4       6.6X
-FileSystemPersistenceEngine with KryoSerializer (zstd)              898        
    950          45          0.0      898118.6       5.6X
 RocksDBPersistenceEngine with JavaSerializer                        299        
    299           0          0.0      298800.0      16.9X
-RocksDBPersistenceEngine with KryoSerializer                        112        
    113           1          0.0      111779.6      45.1X
 BlackHolePersistenceEngine                                            0        
      0           0          5.5         180.3   27924.2X
 
 
diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt 
b/core/benchmarks/PersistenceEngineBenchmark-results.txt
index da1838608de1..ec9a6fc1c8cf 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt
@@ -7,19 +7,12 @@ AMD EPYC 7763 64-Core Processor
 1000 Workers:                                             Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
----------------------------------------------------------------------------------------------------------------------------------------
 ZooKeeperPersistenceEngine with JavaSerializer                     5192        
   5309         116          0.0     5192160.2       1.0X
-ZooKeeperPersistenceEngine with KryoSerializer                     4056        
   4059           5          0.0     4055626.8       1.3X
 FileSystemPersistenceEngine with JavaSerializer                    2926        
   2934           8          0.0     2926383.4       1.8X
 FileSystemPersistenceEngine with JavaSerializer (lz4)               820        
    827          11          0.0      820359.8       6.3X
 FileSystemPersistenceEngine with JavaSerializer (lzf)               772        
    781           9          0.0      772349.1       6.7X
 FileSystemPersistenceEngine with JavaSerializer (snappy)            802        
    812          10          0.0      801815.8       6.5X
 FileSystemPersistenceEngine with JavaSerializer (zstd)              972        
    994          31          0.0      972042.3       5.3X
-FileSystemPersistenceEngine with KryoSerializer                     708        
    726          15          0.0      707927.8       7.3X
-FileSystemPersistenceEngine with KryoSerializer (lz4)               584        
    596          11          0.0      583999.8       8.9X
-FileSystemPersistenceEngine with KryoSerializer (lzf)               880        
    896          14          0.0      880189.2       5.9X
-FileSystemPersistenceEngine with KryoSerializer (snappy)            772        
    821          46          0.0      772130.1       6.7X
-FileSystemPersistenceEngine with KryoSerializer (zstd)              906        
    928          29          0.0      905578.7       5.7X
 RocksDBPersistenceEngine with JavaSerializer                        302        
    302           0          0.0      301664.5      17.2X
-RocksDBPersistenceEngine with KryoSerializer                        109        
    111           2          0.0      108979.5      47.6X
 BlackHolePersistenceEngine                                            0        
      0           0          6.3         158.3   32800.5X
 
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 283443425635..e02d45105727 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -42,7 +42,7 @@ import org.apache.spark.internal.config.Worker._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.resource.{ResourceInformation, ResourceProfile, 
ResourceRequirement, ResourceUtils}
 import org.apache.spark.rpc._
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
+import org.apache.spark.serializer.{JavaSerializer, Serializer}
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, 
Utils}
 import org.apache.spark.util.ArrayImplicits._
 
@@ -179,10 +179,7 @@ private[deploy] class Master(
     masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
     applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
 
-    val serializer = 
RecoverySerializer.withName(conf.get(RECOVERY_SERIALIZER)) match {
-      case RecoverySerializer.JAVA => new JavaSerializer(conf)
-      case RecoverySerializer.KRYO => new KryoSerializer(conf)
-    }
+    val serializer = new JavaSerializer(conf)
     val (persistenceEngine_, leaderElectionAgent_) = recoveryMode match {
       case "ZOOKEEPER" =>
         logInfo("Persisting recovery state to ZooKeeper")
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
index b09fbd7a5bb2..0c2db21905d1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
@@ -26,20 +26,6 @@ private[spark] object Deploy {
     .stringConf
     .createWithDefault("NONE")
 
-  object RecoverySerializer extends Enumeration {
-    val JAVA, KRYO = Value
-  }
-
-  val RECOVERY_SERIALIZER = ConfigBuilder("spark.deploy.recoverySerializer")
-    .doc("Serializer for writing/reading objects to/from persistence engines; 
" +
-      "JAVA or KRYO. Java serializer has been the default mode since Spark 
0.8.1." +
-      "KRYO serializer is a new fast and compact mode from Spark 4.0.0.")
-    .version("4.0.0")
-    .stringConf
-    .transform(_.toUpperCase(Locale.ROOT))
-    .checkValues(RecoverySerializer.values.map(_.toString))
-    .createWithDefault(RecoverySerializer.JAVA.toString)
-
   val RECOVERY_COMPRESSION_CODEC = 
ConfigBuilder("spark.deploy.recoveryCompressionCodec")
     .doc("A compression codec for persistence engines. none (default), lz4, 
lzf, snappy, and " +
       "zstd. Currently, only FILESYSTEM mode supports this configuration.")
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
index 34a447efe528..2a06ee5ed947 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
@@ -29,7 +29,7 @@ import org.apache.spark.deploy.{DeployTestUtils, 
DriverDescription}
 import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.util.Utils
 
 
@@ -49,7 +49,7 @@ import org.apache.spark.util.Utils
 object PersistenceEngineBenchmark extends BenchmarkBase {
 
   val conf = new SparkConf()
-  val serializers = Seq(new JavaSerializer(conf), new KryoSerializer(conf))
+  val serializers = Seq(new JavaSerializer(conf))
   val zkTestServer = new TestingServer(findFreePort(conf))
 
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 01b7e46eb2a8..6839afdeeff8 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf, 
SparkFunSuite}
 import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
+import org.apache.spark.serializer.{JavaSerializer, Serializer}
 import org.apache.spark.util.Utils
 
 class PersistenceEngineSuite extends SparkFunSuite {
@@ -103,18 +103,6 @@ class PersistenceEngineSuite extends SparkFunSuite {
     }
   }
 
-  test("SPARK-46205: Support KryoSerializer in FileSystemPersistenceEngine") {
-    withTempDir { dir =>
-      val conf = new SparkConf()
-      val serializer = new KryoSerializer(conf)
-      val engine = new FileSystemPersistenceEngine(dir.getAbsolutePath, 
serializer)
-      engine.persist("test_1", "test_1_value")
-      engine.read[String]("test_1")
-      engine.unpersist("test_1")
-      engine.close()
-    }
-  }
-
   test("SPARK-46216: FileSystemPersistenceEngine with compression") {
     val conf = new SparkConf()
     CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c =>
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala
index 5e2939738cdf..18b22e7352c9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala
@@ -38,7 +38,7 @@ import org.apache.spark.io.LZ4CompressionCodec
 import org.apache.spark.resource.{ResourceInformation, ResourceProfile, 
ResourceRequirement}
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
 import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.JavaSerializer
 
 class RecoverySuite extends MasterSuiteBase {
   test("can use a custom recovery mode factory") {
@@ -474,26 +474,6 @@ class RecoverySuite extends MasterSuiteBase {
     }
   }
 
-  test("SPARK-46205: Recovery with Kryo Serializer") {
-    val conf = new SparkConf(loadDefaults = false)
-    conf.set(RECOVERY_MODE, "FILESYSTEM")
-    conf.set(RECOVERY_SERIALIZER, "Kryo")
-    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
-
-    var master: Master = null
-    try {
-      master = makeAliveMaster(conf)
-      val e = 
master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
-      assert(e.serializer.isInstanceOf[KryoSerializer])
-    } finally {
-      if (master != null) {
-        master.rpcEnv.shutdown()
-        master.rpcEnv.awaitTermination()
-        master = null
-      }
-    }
-  }
-
   test("SPARK-46216: Recovery without compression") {
     val conf = new SparkConf(loadDefaults = false)
     conf.set(RECOVERY_MODE, "FILESYSTEM")
@@ -536,14 +516,13 @@ class RecoverySuite extends MasterSuiteBase {
   test("SPARK-46258: Recovery with RocksDB") {
     val conf = new SparkConf(loadDefaults = false)
     conf.set(RECOVERY_MODE, "ROCKSDB")
-    conf.set(RECOVERY_SERIALIZER, "Kryo")
     conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
 
     var master: Master = null
     try {
       master = makeAliveMaster(conf)
       val e = 
master.invokePrivate(_persistenceEngine()).asInstanceOf[RocksDBPersistenceEngine]
-      assert(e.serializer.isInstanceOf[KryoSerializer])
+      assert(e.serializer.isInstanceOf[JavaSerializer])
     } finally {
       if (master != null) {
         master.rpcEnv.shutdown()
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 1eab3158e2e5..774c0bee3129 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -793,19 +793,11 @@ In order to enable this recovery mode, you can set 
SPARK_DAEMON_JAVA_OPTS in spa
     <td><code>spark.deploy.recoveryDirectory</code></td>
     <td>""</td>
     <td>The directory in which Spark will store recovery state, accessible 
from the Master's perspective.
-      Note that the directory should be clearly manualy if 
<code>spark.deploy.recoveryMode</code>,
-      <code>spark.deploy.recoverySerializer</code>, or 
<code>spark.deploy.recoveryCompressionCodec</code> is changed.
+      Note that the directory should be clearly manualy if 
<code>spark.deploy.recoveryMode</code>
+      or <code>spark.deploy.recoveryCompressionCodec</code> is changed.
     </td>
     <td>0.8.1</td>
   </tr>
-  <tr>
-    <td><code>spark.deploy.recoverySerializer</code></td>
-    <td>JAVA</td>
-    <td>A serializer for writing/reading objects to/from persistence engines; 
JAVA (default) or KRYO.
-      Java serializer has been the default mode since Spark 0.8.1.
-      Kryo serializer is a new fast and compact mode from Spark 4.0.0.</td>
-    <td>4.0.0</td>
-  </tr>
   <tr>
     <td><code>spark.deploy.recoveryCompressionCodec</code></td>
     <td>(none)</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to