[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22855 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r232401327 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf) } } - @transient - var pool: KryoPool = getPool + private class PoolWrapper extends KryoPool { +private var pool: KryoPool = getPool + +override def borrow(): Kryo = pool.borrow() + +override def release(kryo: Kryo): Unit = pool.release(kryo) + +override def run[T](kryoCallback: KryoCallback[T]): T = pool.run(kryoCallback) + +def reset(): Unit = { + pool = getPool --- End diff -- OK, and it only gets recreated when a new classloader is set, it seems, so it is rare. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r232375758 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf) } } - @transient - var pool: KryoPool = getPool + private class PoolWrapper extends KryoPool { +private var pool: KryoPool = getPool + +override def borrow(): Kryo = pool.borrow() + +override def release(kryo: Kryo): Unit = pool.release(kryo) + +override def run[T](kryoCallback: KryoCallback[T]): T = pool.run(kryoCallback) + +def reset(): Unit = { + pool = getPool --- End diff -- The KryoPool interface exposes no way to free it, so I believe that is not necessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r232332531 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf) } } - @transient - var pool: KryoPool = getPool + private class PoolWrapper extends KryoPool { +private var pool: KryoPool = getPool + +override def borrow(): Kryo = pool.borrow() + +override def release(kryo: Kryo): Unit = pool.release(kryo) + +override def run[T](kryoCallback: KryoCallback[T]): T = pool.run(kryoCallback) + +def reset(): Unit = { + pool = getPool --- End diff -- OK. We could look at updating to Kryo 5 in Spark 3, too, if there were other benefits. Does the old pool need to get freed or anything here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r232330079 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { --- End diff -- This was needed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r232330015 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,36 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { +override def create: Kryo = { + newKryo() +} + } + + private class PoolWrapper extends KryoPool { --- End diff -- This wrapper can be removed when Kryo is updated to 5.0 as the new pool implementation exposes the needed method to clear the pool --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r232203634 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.serializer.KryoTest._ +import org.apache.spark.util.ThreadUtils + +/** + * Benchmark for KryoPool vs old "pool of 1". + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class --jars + * 2. build/sbt "core/test:runMain " + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " + * Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt". + * }}} + */ +object KryoSerializerBenchmark extends BenchmarkBase { --- End diff -- cc @dongjoon-hyun for Benchmark change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r231558355 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -214,8 +230,14 @@ class KryoSerializer(conf: SparkConf) kryo } + override def setDefaultClassLoader(classLoader: ClassLoader): Serializer = { +defaultClassLoader = Some(classLoader) --- End diff -- defaultClassLoader is used in newKryo. I called `getPool` after setting the defaultClassLoader to make sure we don't accidently create a newKryo before the defaultClassLoader is updated. Setting it on 105 was because I don't believe `setDefaultClassLoader` is required to be called. The issue that I unfortunately didn't notice until these tests failed is that the tests specify that you can `setDefaultClassLoader` after serializing an object (I'm not sure this functionality is actually used), causing an "incorrect" Kryo instance to be in the pool. Unfortunately the pool doesn't expose a way to clear itself out, thus the var, which clearly doesn't work. I will work on a solution --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r231554649 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,20 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { +override def create: Kryo = { + newKryo() +} + } + + @transient --- End diff -- Yes, you are correct, I don't think this will work --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r231526283 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,20 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { +override def create: Kryo = { + newKryo() +} + } + + @transient --- End diff -- Back on this item now, if this is no longer a lazy val, and it's transient, how does it get set again after this object itself is serialized? I'm mostly wondering if this is required now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r231526925 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -214,8 +230,14 @@ class KryoSerializer(conf: SparkConf) kryo } + override def setDefaultClassLoader(classLoader: ClassLoader): Serializer = { +defaultClassLoader = Some(classLoader) --- End diff -- You can write `super.setDefaultClassLoader(classLoader)` here to inherit the behavior rather than duplicate it. It is just one line now, yes. Where is defaultClassLoader used in this implementation though? I wonder why it matters that you call `getPool` after this field is set. And if it does, isn't setting it in line 105 not actually helping? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r231213606 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -298,30 +309,40 @@ class KryoDeserializationStream( } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean) +private[spark] class KryoSerializerInstance( + ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance { /** * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are * not synchronized. */ - @Nullable private[this] var cachedKryo: Kryo = borrowKryo() + @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo() /** * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance; * otherwise, it allocates a new instance. */ private[serializer] def borrowKryo(): Kryo = { -if (cachedKryo != null) { - val kryo = cachedKryo - // As a defensive measure, call reset() to clear any Kryo state that might have been modified - // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue) +if (usePool) { + val kryo = ks.pool.borrow() kryo.reset() - cachedKryo = null kryo } else { - ks.newKryo() + if (cachedKryo != null) { +val kryo = cachedKryo +/** + * As a defensive measure, call reset() to clear any Kryo state that might have --- End diff -- Another total nit, not worth touching unless you make other changes, but this is scaladoc-style. Multi-line comments are often just commented with `//` as before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r231213322 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -431,9 +434,11 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { ser.deserialize[HashMap[Int, List[String]]](serializedMap) } - private def testSerializerInstanceReuse(autoReset: Boolean, referenceTracking: Boolean): Unit = { + private def testSerializerInstanceReuse( + autoReset: Boolean, referenceTracking: Boolean, usePool: Boolean): Unit = { --- End diff -- Total nit: there are a couple continuation indents that are 3 rather than 4 spaces. Don't bother, unless you need to make other updates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r230422530 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) + private val usePool = conf.getBoolean("spark.kryo.pool", false) --- End diff -- Sounds good and done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r230421073 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) + private val usePool = conf.getBoolean("spark.kryo.pool", false) --- End diff -- Yeah I think that's a fine position to take, if we can't think of a reason to disable it other than the theoretical unknown unknown bug out there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r230420266 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) + private val usePool = conf.getBoolean("spark.kryo.pool", false) --- End diff -- Okay, if I'm understanding you correctly I should set the default to true, and remove the documentation. I will go ahead and do that, if I misunderstood let me know. Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r230052150 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) + private val usePool = conf.getBoolean("spark.kryo.pool", false) --- End diff -- I would not document it. This is just a safety valve. In theory, there's no reason to disable this nor would a caller know why to disable it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229763892 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) + private val usePool = conf.getBoolean("spark.kryo.pool", false) --- End diff -- I had already added this to docs/configuration.md should I remove it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229763610 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling // reference-tracking would lead to corrupted output when serializer instances are re-used - for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) { -test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") { - testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking) + for { +referenceTracking <- Set(true, false) --- End diff -- Might as well fix while I'm here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229762873 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { --- End diff -- Thanks, I'll remove, I had a problem previously but it looks like the @transient on the pool is enough itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229762474 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { +override def create: Kryo = { + newKryo() +} + } + + @transient + lazy val pool = new KryoPool.Builder(factory).softReferences.build --- End diff -- This is used directly by KryoSerializerInstance --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229738657 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling // reference-tracking would lead to corrupted output when serializer instances are re-used - for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) { -test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") { - testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking) + for { +referenceTracking <- Set(true, false) +autoReset <- Set(true, false) +usePool <- Set(true, false) + } { +test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking" + + s", usePool = $usePool") { + testSerializerInstanceReuse( +autoReset = autoReset, referenceTracking = referenceTracking, usePool = usePool) +} + } + + test("SPARK-25839 KryoPool implementation works correctly in multi-threaded environment") { +import java.util.concurrent.Executors --- End diff -- I'd import at the top of the file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229740163 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) + private val usePool = conf.getBoolean("spark.kryo.pool", false) --- End diff -- Yep, I'd leave this undocumented --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229739685 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling // reference-tracking would lead to corrupted output when serializer instances are re-used - for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) { -test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") { - testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking) + for { +referenceTracking <- Set(true, false) +autoReset <- Set(true, false) +usePool <- Set(true, false) + } { +test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking" + + s", usePool = $usePool") { + testSerializerInstanceReuse( +autoReset = autoReset, referenceTracking = referenceTracking, usePool = usePool) --- End diff -- Likewise i see this is just how the code was written before but the `foo = foo` style isn't adding anything IMHO. Feel free to not name args --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229738553 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling // reference-tracking would lead to corrupted output when serializer instances are re-used - for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) { -test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") { - testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking) + for { +referenceTracking <- Set(true, false) --- End diff -- Not that it matters, but I think this should have originally been Seq not Set --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229737692 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { --- End diff -- I think you're welcome to just write ... ``` private lazy val factory = new KryoFactory() { override def create: Kryo = newKryo() } ``` but it doesn't matter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229737307 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { +override def create: Kryo = { + newKryo() +} + } + + @transient + lazy val pool = new KryoPool.Builder(factory).softReferences.build --- End diff -- `private`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229738794 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput} +import com.esotericsoftware.kryo.pool._ --- End diff -- I'd spell out the imports for clarity unless it's going to run more than 2 lines or something --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228970250 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -298,30 +312,40 @@ class KryoDeserializationStream( } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean) +private[spark] class KryoSerializerInstance( + ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance { /** * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are * not synchronized. */ - @Nullable private[this] var cachedKryo: Kryo = borrowKryo() + @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo() /** * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance; * otherwise, it allocates a new instance. */ private[serializer] def borrowKryo(): Kryo = { -if (cachedKryo != null) { - val kryo = cachedKryo - // As a defensive measure, call reset() to clear any Kryo state that might have been modified - // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue) +if (usePool) { + val kryo = ks.pool.borrow() kryo.reset() - cachedKryo = null kryo } else { - ks.newKryo() + if (cachedKryo != null) { +val kryo = cachedKryo +/** +* As a defensive measure, call reset() to clear any Kryo state that might have --- End diff -- Thanks fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user 10110346 commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228844982 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -298,30 +312,40 @@ class KryoDeserializationStream( } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean) +private[spark] class KryoSerializerInstance( + ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance { /** * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are * not synchronized. */ - @Nullable private[this] var cachedKryo: Kryo = borrowKryo() + @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo() /** * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance; * otherwise, it allocates a new instance. */ private[serializer] def borrowKryo(): Kryo = { -if (cachedKryo != null) { - val kryo = cachedKryo - // As a defensive measure, call reset() to clear any Kryo state that might have been modified - // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue) +if (usePool) { + val kryo = ks.pool.borrow() kryo.reset() - cachedKryo = null kryo } else { - ks.newKryo() + if (cachedKryo != null) { +val kryo = cachedKryo +/** +* As a defensive measure, call reset() to clear any Kryo state that might have --- End diff -- The `*`after the first line must be aligned with the first `*` of the first line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228759466 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -298,30 +312,40 @@ class KryoDeserializationStream( } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean) +private[spark] class KryoSerializerInstance( + ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance { /** * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are * not synchronized. */ - @Nullable private[this] var cachedKryo: Kryo = borrowKryo() + @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo() /** * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance; * otherwise, it allocates a new instance. */ private[serializer] def borrowKryo(): Kryo = { -if (cachedKryo != null) { - val kryo = cachedKryo - // As a defensive measure, call reset() to clear any Kryo state that might have been modified - // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue) +if (usePool) { + val kryo = ks.pool.borrow() kryo.reset() - cachedKryo = null kryo } else { - ks.newKryo() + if (cachedKryo != null) { +val kryo = cachedKryo +/** +* As a defensive measure, call reset() to clear any Kryo state that might have --- End diff -- Sorry I'm new to this, what is the specific style issue here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228759425 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.serializer.KryoTest._ +import org.apache.spark.util.ThreadUtils + +/** + * Benchmark for KryoPool vs old "pool of 1". + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class --jars + * 2. build/sbt "core/test:runMain " + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " + * Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt". + * }}} + */ +object KryoSerializerBenchmark extends BenchmarkBase { + + var sc: SparkContext = null + val N = 500 + override def runBenchmarkSuite(): Unit = { +val name = "Benchmark KryoPool vs old\"pool of 1\" implementation" +runBenchmark(name) { + val benchmark = new Benchmark(name, N, 10, output = output) + Seq(true, false).foreach(usePool => run(usePool, benchmark)) + benchmark.run() +} + } + + private def run(usePool: Boolean, benchmark: Benchmark): Unit = { +lazy val sc = createSparkContext(usePool) + +benchmark.addCase(s"KryoPool:$usePool") { _ => + val futures = for (_ <- 0 until N) yield { +Future { + sc.parallelize(0 until 10).map(i => i + 1).count() +} + } + + val future = Future.sequence(futures) + + ThreadUtils.awaitResult(future, 10.minutes) +} + } + + def createSparkContext(usePool: Boolean): SparkContext = { --- End diff -- I'm not sure I understand the question here, this benchmark class doesn't inherit from `SqlBasedBenchmark` it inherits from `BenchmarkBase` which has no `getSparkSession` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228716995 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -298,30 +312,40 @@ class KryoDeserializationStream( } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean) +private[spark] class KryoSerializerInstance( + ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance { /** * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are * not synchronized. */ - @Nullable private[this] var cachedKryo: Kryo = borrowKryo() + @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo() /** * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance; * otherwise, it allocates a new instance. */ private[serializer] def borrowKryo(): Kryo = { -if (cachedKryo != null) { - val kryo = cachedKryo - // As a defensive measure, call reset() to clear any Kryo state that might have been modified - // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue) +if (usePool) { + val kryo = ks.pool.borrow() kryo.reset() - cachedKryo = null kryo } else { - ks.newKryo() + if (cachedKryo != null) { +val kryo = cachedKryo +/** +* As a defensive measure, call reset() to clear any Kryo state that might have --- End diff -- nit for the style. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228716497 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,18 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + --- End diff -- nit: extra empty line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228716824 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.serializer.KryoTest._ +import org.apache.spark.util.ThreadUtils + +/** + * Benchmark for KryoPool vs old "pool of 1". + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class --jars + * 2. build/sbt "core/test:runMain " + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " + * Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt". + * }}} + */ +object KryoSerializerBenchmark extends BenchmarkBase { + + var sc: SparkContext = null + val N = 500 + override def runBenchmarkSuite(): Unit = { +val name = "Benchmark KryoPool vs old\"pool of 1\" implementation" +runBenchmark(name) { + val benchmark = new Benchmark(name, N, 10, output = output) + Seq(true, false).foreach(usePool => run(usePool, benchmark)) + benchmark.run() +} + } + + private def run(usePool: Boolean, benchmark: Benchmark): Unit = { +lazy val sc = createSparkContext(usePool) + +benchmark.addCase(s"KryoPool:$usePool") { _ => + val futures = for (_ <- 0 until N) yield { +Future { + sc.parallelize(0 until 10).map(i => i + 1).count() +} + } + + val future = Future.sequence(futures) + + ThreadUtils.awaitResult(future, 10.minutes) +} + } + + def createSparkContext(usePool: Boolean): SparkContext = { --- End diff -- We add this but not override `getSparkSession` in `SqlBasedBenchmark`, is it because change conf in SparkSession doesn't work for SparkSession? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228716844 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -33,6 +33,7 @@ import org.apache.spark.serializer.KryoTest._ import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils + --- End diff -- nit: extra empty line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
GitHub user patrickbrownsync opened a pull request: https://github.com/apache/spark/pull/22855 [SPARK-25839] [Core] Implement use of KryoPool in KryoSerializer ## What changes were proposed in this pull request? * Implement (optional) use of KryoPool in KryoSerializer, an alternative to the existing implementation of caching a Kryo instance inside KryoSerializerInstance * Add config key & documentation of spark.kryo.pool in order to turn this on * Add benchmark KryoSerializerBenchmark to compare new and old implementation * Add results of benchmark ## How was this patch tested? Added new tests inside KryoSerializerSuite to test the pool implementation as well as added the pool option to the existing regression testing for SPARK-7766 This is my original work and I license the work to the project under the projectâs open source license. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Blyncs/spark kryo-pool Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22855.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22855 commit ce5d13e0673f8574b5795d0d3df59da03118038a Author: Patrick Brown Date: 2018-04-06T18:19:52Z use kryo pool for kryo serializer commit a4ba88eed6d18d2df5fab609bc28210df9b5a716 Author: Patrick Brown Date: 2018-04-09T21:20:16Z fix pool serializable issue commit 3f1c41ccc451af868a13616065676a5667d597fa Author: Patrick Brown Date: 2018-10-26T18:52:07Z Add option to KryoSerializer to use new KryoPool based implentation Add tests for new implementation to KryoSerializerSuite.scala Add benchmark new vs old implemtation in KryoSerializerBenchmark.scala Add option in Benchmark base for afterAll() shutdown code to facilitate clean benchmark shutdown Add documentation for spark.kryo.pool configuration option --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org