Repository: spark
Updated Branches:
  refs/heads/master 4f1dcd3dc -> bc167a2a5


[SPARK-928][CORE] Add support for Unsafe-based serializer in Kryo

## What changes were proposed in this pull request?
Now since we have migrated to Kryo-3.0.0 in 
https://issues.apache.org/jira/browse/SPARK-11416, we can gives users option to 
use unsafe SerDer. It can turned by setting `spark.kryo.useUnsafe` to `true`

## How was this patch tested?
Ran existing tests

```
     Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
      
------------------------------------------------------------------------------------------------
      basicTypes: Int unsafe:true                    160 /  178         98.5    
      10.1       1.0X
      basicTypes: Long unsafe:true                   210 /  218         74.9    
      13.4       0.8X
      basicTypes: Float unsafe:true                  203 /  213         77.5    
      12.9       0.8X
      basicTypes: Double unsafe:true                 226 /  235         69.5    
      14.4       0.7X
      Array: Int unsafe:true                        1087 / 1101         14.5    
      69.1       0.1X
      Array: Long unsafe:true                       2758 / 2844          5.7    
     175.4       0.1X
      Array: Float unsafe:true                      1511 / 1552         10.4    
      96.1       0.1X
      Array: Double unsafe:true                     2942 / 2972          5.3    
     187.0       0.1X
      Map of string->Double unsafe:true             2645 / 2739          5.9    
     168.2       0.1X
      basicTypes: Int unsafe:false                   211 /  218         74.7    
      13.4       0.8X
      basicTypes: Long unsafe:false                  247 /  253         63.6    
      15.7       0.6X
      basicTypes: Float unsafe:false                 211 /  216         74.5    
      13.4       0.8X
      basicTypes: Double unsafe:false                227 /  233         69.2    
      14.4       0.7X
      Array: Int unsafe:false                       3012 / 3032          5.2    
     191.5       0.1X
      Array: Long unsafe:false                      4463 / 4515          3.5    
     283.8       0.0X
      Array: Float unsafe:false                     2788 / 2868          5.6    
     177.2       0.1X
      Array: Double unsafe:false                    3558 / 3752          4.4    
     226.2       0.0X
      Map of string->Double unsafe:false            2806 / 2933          5.6    
     178.4       0.1X
```

Author: Sandeep Singh <sand...@techaddict.me>
Author: Sandeep Singh <sand...@origamilogic.com>

Closes #12913 from techaddict/SPARK-928.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc167a2a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc167a2a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc167a2a

Branch: refs/heads/master
Commit: bc167a2a53f5a795d089e8a884569b1b3e2cd439
Parents: 4f1dcd3
Author: Sandeep Singh <sand...@techaddict.me>
Authored: Sat Oct 22 12:03:37 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat Oct 22 12:03:37 2016 -0700

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       |  36 +++--
 .../apache/spark/serializer/KryoBenchmark.scala | 139 +++++++++++++++++++
 .../spark/serializer/KryoSerializerSuite.scala  |   1 +
 .../serializer/UnsafeKryoSerializerSuite.scala  |  33 +++++
 docs/configuration.md                           |   8 ++
 5 files changed, 206 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc167a2a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 1fba552..0d26281 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -27,6 +27,7 @@ import scala.reflect.ClassTag
 
 import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => 
KryoClassSerializer}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
+import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, 
UnsafeOutput => KryoUnsafeOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => 
KryoJavaSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
 import org.apache.avro.generic.{GenericData, GenericRecord}
@@ -78,8 +79,15 @@ class KryoSerializer(conf: SparkConf)
     .filter(!_.isEmpty)
 
   private val avroSchemas = conf.getAvroSchema
+  // whether to use unsafe based IO for serialization
+  private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
 
-  def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, 
math.max(bufferSize, maxBufferSize))
+  def newKryoOutput(): KryoOutput =
+    if (useUnsafe) {
+      new KryoUnsafeOutput(bufferSize, math.max(bufferSize, maxBufferSize))
+    } else {
+      new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
+    }
 
   def newKryo(): Kryo = {
     val instantiator = new EmptyScalaKryoInstantiator
@@ -172,7 +180,7 @@ class KryoSerializer(conf: SparkConf)
   }
 
   override def newInstance(): SerializerInstance = {
-    new KryoSerializerInstance(this)
+    new KryoSerializerInstance(this, useUnsafe)
   }
 
   private[spark] override lazy val supportsRelocationOfSerializedObjects: 
Boolean = {
@@ -186,9 +194,12 @@ class KryoSerializer(conf: SparkConf)
 private[spark]
 class KryoSerializationStream(
     serInstance: KryoSerializerInstance,
-    outStream: OutputStream) extends SerializationStream {
+    outStream: OutputStream,
+    useUnsafe: Boolean) extends SerializationStream {
+
+  private[this] var output: KryoOutput =
+    if (useUnsafe) new KryoUnsafeOutput(outStream) else new 
KryoOutput(outStream)
 
-  private[this] var output: KryoOutput = new KryoOutput(outStream)
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
   override def writeObject[T: ClassTag](t: T): SerializationStream = {
@@ -219,9 +230,12 @@ class KryoSerializationStream(
 private[spark]
 class KryoDeserializationStream(
     serInstance: KryoSerializerInstance,
-    inStream: InputStream) extends DeserializationStream {
+    inStream: InputStream,
+    useUnsafe: Boolean) extends DeserializationStream {
+
+  private[this] var input: KryoInput =
+    if (useUnsafe) new KryoUnsafeInput(inStream) else new KryoInput(inStream)
 
-  private[this] var input: KryoInput = new KryoInput(inStream)
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
   override def readObject[T: ClassTag](): T = {
@@ -248,8 +262,8 @@ class KryoDeserializationStream(
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends 
SerializerInstance {
-
+private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: 
Boolean)
+  extends SerializerInstance {
   /**
    * A re-used [[Kryo]] instance. Methods will borrow this instance by calling 
`borrowKryo()`, do
    * their work, then release the instance by calling `releaseKryo()`. 
Logically, this is a caching
@@ -288,7 +302,7 @@ private[spark] class KryoSerializerInstance(ks: 
KryoSerializer) extends Serializ
 
   // Make these lazy vals to avoid creating a buffer unless we use them.
   private lazy val output = ks.newKryoOutput()
-  private lazy val input = new KryoInput()
+  private lazy val input = if (useUnsafe) new KryoUnsafeInput() else new 
KryoInput()
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
     output.clear()
@@ -329,11 +343,11 @@ private[spark] class KryoSerializerInstance(ks: 
KryoSerializer) extends Serializ
   }
 
   override def serializeStream(s: OutputStream): SerializationStream = {
-    new KryoSerializationStream(this, s)
+    new KryoSerializationStream(this, s, useUnsafe)
   }
 
   override def deserializeStream(s: InputStream): DeserializationStream = {
-    new KryoDeserializationStream(this, s)
+    new KryoDeserializationStream(this, s, useUnsafe)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bc167a2a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
new file mode 100644
index 0000000..64be966
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.KryoTest._
+import org.apache.spark.util.Benchmark
+
+class KryoBenchmark extends SparkFunSuite {
+  val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 
1024 * 1024 * 15, 10)
+
+  ignore(s"Benchmark Kryo Unsafe vs safe Serialization") {
+    Seq (true, false).foreach (runBenchmark)
+    benchmark.run()
+
+    // scalastyle:off
+    /*
+      Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
+      
------------------------------------------------------------------------------------------------
+      basicTypes: Int with unsafe:true               151 /  170        104.2   
        9.6       1.0X
+      basicTypes: Long with unsafe:true              175 /  191         89.8   
       11.1       0.9X
+      basicTypes: Float with unsafe:true             177 /  184         88.8   
       11.3       0.9X
+      basicTypes: Double with unsafe:true            193 /  216         81.4   
       12.3       0.8X
+      Array: Int with unsafe:true                    513 /  587         30.7   
       32.6       0.3X
+      Array: Long with unsafe:true                  1211 / 1358         13.0   
       77.0       0.1X
+      Array: Float with unsafe:true                  890 /  964         17.7   
       56.6       0.2X
+      Array: Double with unsafe:true                1335 / 1428         11.8   
       84.9       0.1X
+      Map of string->Double  with unsafe:true        931 /  988         16.9   
       59.2       0.2X
+      basicTypes: Int with unsafe:false              197 /  217         79.9   
       12.5       0.8X
+      basicTypes: Long with unsafe:false             219 /  240         71.8   
       13.9       0.7X
+      basicTypes: Float with unsafe:false            208 /  217         75.7   
       13.2       0.7X
+      basicTypes: Double with unsafe:false           208 /  225         75.6   
       13.2       0.7X
+      Array: Int with unsafe:false                  2559 / 2681          6.1   
      162.7       0.1X
+      Array: Long with unsafe:false                 3425 / 3516          4.6   
      217.8       0.0X
+      Array: Float with unsafe:false                2025 / 2134          7.8   
      128.7       0.1X
+      Array: Double with unsafe:false               2241 / 2358          7.0   
      142.5       0.1X
+      Map of string->Double  with unsafe:false      1044 / 1085         15.1   
       66.4       0.1X
+    */
+    // scalastyle:on
+  }
+
+  private def runBenchmark(useUnsafe: Boolean): Unit = {
+    def check[T: ClassTag](t: T, ser: SerializerInstance): Int = {
+      if (ser.deserialize[T](ser.serialize(t)) === t) 1 else 0
+    }
+
+    // Benchmark Primitives
+    val basicTypeCount = 1000000
+    def basicTypes[T: ClassTag](name: String, gen: () => T): Unit = {
+      lazy val ser = createSerializer(useUnsafe)
+      val arrayOfBasicType: Array[T] = Array.fill(basicTypeCount)(gen())
+
+      benchmark.addCase(s"basicTypes: $name with unsafe:$useUnsafe") { _ =>
+        var sum = 0L
+        var i = 0
+        while (i < basicTypeCount) {
+          sum += check(arrayOfBasicType(i), ser)
+          i += 1
+        }
+        sum
+      }
+    }
+    basicTypes("Int", Random.nextInt)
+    basicTypes("Long", Random.nextLong)
+    basicTypes("Float", Random.nextFloat)
+    basicTypes("Double", Random.nextDouble)
+
+    // Benchmark Array of Primitives
+    val arrayCount = 10000
+    def basicTypeArray[T: ClassTag](name: String, gen: () => T): Unit = {
+      lazy val ser = createSerializer(useUnsafe)
+      val arrayOfArrays: Array[Array[T]] =
+        
Array.fill(arrayCount)(Array.fill[T](Random.nextInt(arrayCount))(gen()))
+
+      benchmark.addCase(s"Array: $name with unsafe:$useUnsafe") { _ =>
+        var sum = 0L
+        var i = 0
+        while (i < arrayCount) {
+          val arr = arrayOfArrays(i)
+          sum += check(arr, ser)
+          i += 1
+        }
+        sum
+      }
+    }
+    basicTypeArray("Int", Random.nextInt)
+    basicTypeArray("Long", Random.nextLong)
+    basicTypeArray("Float", Random.nextFloat)
+    basicTypeArray("Double", Random.nextDouble)
+
+    // Benchmark Maps
+    val mapsCount = 1000
+    lazy val ser = createSerializer(useUnsafe)
+    val arrayOfMaps: Array[Map[String, Double]] = Array.fill(mapsCount) {
+      Array.fill(Random.nextInt(mapsCount)) {
+        (Random.nextString(mapsCount / 10), Random.nextDouble())
+      }.toMap
+    }
+
+    benchmark.addCase(s"Map of string->Double  with unsafe:$useUnsafe") { _ =>
+      var sum = 0L
+      var i = 0
+      while (i < mapsCount) {
+        val map = arrayOfMaps(i)
+        sum += check(map, ser)
+        i += 1
+      }
+      sum
+    }
+  }
+
+  def createSerializer(useUnsafe: Boolean): SerializerInstance = {
+    val conf = new SparkConf()
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+    conf.set("spark.kryo.unsafe", useUnsafe.toString)
+
+    new KryoSerializer(conf).newInstance()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc167a2a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index bc6e983..5040841 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.util.Utils
 class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
   conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+  conf.set("spark.kryo.unsafe", "false")
 
   test("SPARK-7392 configuration limits") {
     val kryoBufferProperty = "spark.kryoserializer.buffer"

http://git-wip-us.apache.org/repos/asf/spark/blob/bc167a2a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
new file mode 100644
index 0000000..d63a45a
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+class UnsafeKryoSerializerSuite extends KryoSerializerSuite {
+
+  // This test suite should run all tests in KryoSerializerSuite with kryo 
unsafe.
+
+  override def beforeAll() {
+    conf.set("spark.kryo.unsafe", "true")
+    super.beforeAll()
+  }
+
+  override def afterAll() {
+    conf.set("spark.kryo.unsafe", "false")
+    super.afterAll()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc167a2a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index a4a99d6..b07867d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -800,6 +800,14 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.kryo.unsafe</code></td>
+  <td>false</td>
+  <td>
+    Whether to use unsafe based Kryo serializer. Can be
+    substantially faster by using Unsafe Based IO.
+  </td>
+</tr>
+<tr>
   <td><code>spark.kryoserializer.buffer.max</code></td>
   <td>64m</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to