[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r232375758 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf) } } - @transient - var pool: KryoPool = getPool + private class PoolWrapper extends KryoPool { +private var pool: KryoPool = getPool + +override def borrow(): Kryo = pool.borrow() + +override def release(kryo: Kryo): Unit = pool.release(kryo) + +override def run[T](kryoCallback: KryoCallback[T]): T = pool.run(kryoCallback) + +def reset(): Unit = { + pool = getPool --- End diff -- The KryoPool interface exposes no way to free it, so I believe that is not necessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r232330079 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { --- End diff -- This was needed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r232330015 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,36 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { +override def create: Kryo = { + newKryo() +} + } + + private class PoolWrapper extends KryoPool { --- End diff -- This wrapper can be removed when Kryo is updated to 5.0 as the new pool implementation exposes the needed method to clear the pool --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r231558355 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -214,8 +230,14 @@ class KryoSerializer(conf: SparkConf) kryo } + override def setDefaultClassLoader(classLoader: ClassLoader): Serializer = { +defaultClassLoader = Some(classLoader) --- End diff -- defaultClassLoader is used in newKryo. I called `getPool` after setting the defaultClassLoader to make sure we don't accidently create a newKryo before the defaultClassLoader is updated. Setting it on 105 was because I don't believe `setDefaultClassLoader` is required to be called. The issue that I unfortunately didn't notice until these tests failed is that the tests specify that you can `setDefaultClassLoader` after serializing an object (I'm not sure this functionality is actually used), causing an "incorrect" Kryo instance to be in the pool. Unfortunately the pool doesn't expose a way to clear itself out, thus the var, which clearly doesn't work. I will work on a solution --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r231554649 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,20 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { +override def create: Kryo = { + newKryo() +} + } + + @transient --- End diff -- Yes, you are correct, I don't think this will work --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...
Github user patrickbrownsync commented on the issue: https://github.com/apache/spark/pull/22855 @srowen I went ahead and fixed those issues where I could find them, mostly for my own learning experience. Thanks for taking the time to review this many times, really appreciate it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...
Github user patrickbrownsync commented on the issue: https://github.com/apache/spark/pull/22855 @srowen should be fixed now, I rebased off master and changed KryoSerializerBenchmark to conform to how BenchmarkBase had changed on master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r230422530 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) + private val usePool = conf.getBoolean("spark.kryo.pool", false) --- End diff -- Sounds good and done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r230420266 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) + private val usePool = conf.getBoolean("spark.kryo.pool", false) --- End diff -- Okay, if I'm understanding you correctly I should set the default to true, and remove the documentation. I will go ahead and do that, if I misunderstood let me know. Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22883: [SPARK-25837] [Core] Fix potential slowdown in AppStatus...
Github user patrickbrownsync commented on the issue: https://github.com/apache/spark/pull/22883 I would be happy to start thinking about that. Can you provide more details on what you mean by data loss on write in the InMemoryStore? I now see that the types being inserted have an annotation around which fields they are allowed to be indexed by, but I don't have a good idea of how you would allow multiple indexes in a collection short of running something like SQLite in memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...
Github user patrickbrownsync commented on the issue: https://github.com/apache/spark/pull/22855 @srowen Thanks for taking the time to give me feedback! I'll get this stuff fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229763892 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) + private val usePool = conf.getBoolean("spark.kryo.pool", false) --- End diff -- I had already added this to docs/configuration.md should I remove it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229763610 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling // reference-tracking would lead to corrupted output when serializer instances are re-used - for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) { -test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") { - testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking) + for { +referenceTracking <- Set(true, false) --- End diff -- Might as well fix while I'm here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229762873 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { --- End diff -- Thanks, I'll remove, I had a problem previously but it looks like the @transient on the pool is enough itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r229762474 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf) new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) } + @transient + private lazy val factory: KryoFactory = new KryoFactory() { +override def create: Kryo = { + newKryo() +} + } + + @transient + lazy val pool = new KryoPool.Builder(factory).softReferences.build --- End diff -- This is used directly by KryoSerializerInstance --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22883: [SPARK-25837] [Core] Fix potential slowdown in Ap...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22883#discussion_r229539016 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -1105,6 +1095,15 @@ private[spark] class AppStatusListener( cleanupCachedQuantiles(key) } + +// Delete tasks for all stages in one pass, as deleting them for each stage individually is slow --- End diff -- Sure Take a look at the implementation of InMemoryView at spark/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java line 179 specifically the implementation of iterator on line 193, here is an excerpt: ``` Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter)); Stream stream = sorted.stream(); if (first != null) { stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0); } if (last != null) { stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0); } ``` and the original, in loop deletion code: ``` val tasks = kvstore.view(classOf[TaskDataWrapper]) .index("stage") .first(key) .last(key) .asScala tasks.foreach { t => kvstore.delete(t.getClass(), t.taskId) } ``` So you can see, if we do this each loop we actually sort the whole collection of TaskDataWrapper which are currently in the store, then go through and check each item based on the key set (the stage). Assuming we have a large number of stages and tasks this is an O(n^2) operation, which is what happens on my production application and the repro code. If we do this in one pass for all stages, we only sort and iterate the list of tasks one time. This same pattern happens fairly frequently using the KVStoreView interface and InMemoryView implementation. Since I am new to contributing to Spark I did not undertake a massive refactor, but I would suggest that this interface and implementation should be looked at and re-designed with efficiency in mind. The current implementation favors flexibility in terms of how the dataset is sorted and filtered, but enforcing a single sort order via something like a SortedSet would hopefully make it clear when the operation being performed was efficiently searching inside the collection, and when you were using an inefficient access pattern. I hope that explains the reasoning, if you have any more questions let me know. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22883: [SPARK-25837] [Core] Fix potential slowdown in Ap...
GitHub user patrickbrownsync opened a pull request: https://github.com/apache/spark/pull/22883 [SPARK-25837] [Core] Fix potential slowdown in AppStatusListener when cleaning up stages ## What changes were proposed in this pull request? * Update `AppStatusListener` `cleanupStages` method to remove tasks for those stages in a single pass instead of 1 for each stage. * This fixes an issue where the cleanupStages method would get backed up, causing a backup in the executor in ElementTrackingStore, resulting in stages and jobs not getting cleaned up properly. Tasks seem most susceptible to this as there are a lot of them, however a similar issue could arise in other locations the `KVStore` `view` method is used. A broader fix might involve updates to `KVStoreView` and `InMemoryView` as it appears this interface and implementation can lead to multiple and inefficient traversals of the stored data. ## How was this patch tested? Using existing tests in AppStatusListenerSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/Blyncs/spark cleanup-stages-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22883.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22883 commit 4bfdac025ff0906316cf7697933a7b374ae3b427 Author: Patrick Brown Date: 2018-10-29T19:49:50Z Update cleanupStages in AppStatusListener to delete tasks for all stages in a single pass commit 178f7c3bf82f93177fce086037ece6ebf09bb350 Author: Patrick Brown Date: 2018-10-29T19:55:38Z remove uneeded type --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228970250 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -298,30 +312,40 @@ class KryoDeserializationStream( } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean) +private[spark] class KryoSerializerInstance( + ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance { /** * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are * not synchronized. */ - @Nullable private[this] var cachedKryo: Kryo = borrowKryo() + @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo() /** * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance; * otherwise, it allocates a new instance. */ private[serializer] def borrowKryo(): Kryo = { -if (cachedKryo != null) { - val kryo = cachedKryo - // As a defensive measure, call reset() to clear any Kryo state that might have been modified - // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue) +if (usePool) { + val kryo = ks.pool.borrow() kryo.reset() - cachedKryo = null kryo } else { - ks.newKryo() + if (cachedKryo != null) { +val kryo = cachedKryo +/** +* As a defensive measure, call reset() to clear any Kryo state that might have --- End diff -- Thanks fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228759466 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -298,30 +312,40 @@ class KryoDeserializationStream( } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean) +private[spark] class KryoSerializerInstance( + ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance { /** * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are * not synchronized. */ - @Nullable private[this] var cachedKryo: Kryo = borrowKryo() + @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo() /** * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance; * otherwise, it allocates a new instance. */ private[serializer] def borrowKryo(): Kryo = { -if (cachedKryo != null) { - val kryo = cachedKryo - // As a defensive measure, call reset() to clear any Kryo state that might have been modified - // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue) +if (usePool) { + val kryo = ks.pool.borrow() kryo.reset() - cachedKryo = null kryo } else { - ks.newKryo() + if (cachedKryo != null) { +val kryo = cachedKryo +/** +* As a defensive measure, call reset() to clear any Kryo state that might have --- End diff -- Sorry I'm new to this, what is the specific style issue here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
Github user patrickbrownsync commented on a diff in the pull request: https://github.com/apache/spark/pull/22855#discussion_r228759425 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.serializer.KryoTest._ +import org.apache.spark.util.ThreadUtils + +/** + * Benchmark for KryoPool vs old "pool of 1". + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class --jars + * 2. build/sbt "core/test:runMain " + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " + * Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt". + * }}} + */ +object KryoSerializerBenchmark extends BenchmarkBase { + + var sc: SparkContext = null + val N = 500 + override def runBenchmarkSuite(): Unit = { +val name = "Benchmark KryoPool vs old\"pool of 1\" implementation" +runBenchmark(name) { + val benchmark = new Benchmark(name, N, 10, output = output) + Seq(true, false).foreach(usePool => run(usePool, benchmark)) + benchmark.run() +} + } + + private def run(usePool: Boolean, benchmark: Benchmark): Unit = { +lazy val sc = createSparkContext(usePool) + +benchmark.addCase(s"KryoPool:$usePool") { _ => + val futures = for (_ <- 0 until N) yield { +Future { + sc.parallelize(0 until 10).map(i => i + 1).count() +} + } + + val future = Future.sequence(futures) + + ThreadUtils.awaitResult(future, 10.minutes) +} + } + + def createSparkContext(usePool: Boolean): SparkContext = { --- End diff -- I'm not sure I understand the question here, this benchmark class doesn't inherit from `SqlBasedBenchmark` it inherits from `BenchmarkBase` which has no `getSparkSession` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...
GitHub user patrickbrownsync opened a pull request: https://github.com/apache/spark/pull/22855 [SPARK-25839] [Core] Implement use of KryoPool in KryoSerializer ## What changes were proposed in this pull request? * Implement (optional) use of KryoPool in KryoSerializer, an alternative to the existing implementation of caching a Kryo instance inside KryoSerializerInstance * Add config key & documentation of spark.kryo.pool in order to turn this on * Add benchmark KryoSerializerBenchmark to compare new and old implementation * Add results of benchmark ## How was this patch tested? Added new tests inside KryoSerializerSuite to test the pool implementation as well as added the pool option to the existing regression testing for SPARK-7766 This is my original work and I license the work to the project under the projectâs open source license. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Blyncs/spark kryo-pool Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22855.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22855 commit ce5d13e0673f8574b5795d0d3df59da03118038a Author: Patrick Brown Date: 2018-04-06T18:19:52Z use kryo pool for kryo serializer commit a4ba88eed6d18d2df5fab609bc28210df9b5a716 Author: Patrick Brown Date: 2018-04-09T21:20:16Z fix pool serializable issue commit 3f1c41ccc451af868a13616065676a5667d597fa Author: Patrick Brown Date: 2018-10-26T18:52:07Z Add option to KryoSerializer to use new KryoPool based implentation Add tests for new implementation to KryoSerializerSuite.scala Add benchmark new vs old implemtation in KryoSerializerBenchmark.scala Add option in Benchmark base for afterAll() shutdown code to facilitate clean benchmark shutdown Add documentation for spark.kryo.pool configuration option --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org