[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-09 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r232375758
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf)
 }
   }
 
-  @transient
-  var pool: KryoPool = getPool
+  private class PoolWrapper extends KryoPool {
+private var pool: KryoPool = getPool
+
+override def borrow(): Kryo = pool.borrow()
+
+override def release(kryo: Kryo): Unit = pool.release(kryo)
+
+override def run[T](kryoCallback: KryoCallback[T]): T = 
pool.run(kryoCallback)
+
+def reset(): Unit = {
+  pool = getPool
--- End diff --

The KryoPool interface exposes no way to free it, so I believe that is not 
necessary


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-09 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r232330079
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
--- End diff --

This was needed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-09 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r232330015
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,36 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
+override def create: Kryo = {
+  newKryo()
+}
+  }
+
+  private class PoolWrapper extends KryoPool {
--- End diff --

This wrapper can be removed when Kryo is updated to 5.0 as the new pool 
implementation exposes the needed method to clear the pool


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-07 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r231558355
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -214,8 +230,14 @@ class KryoSerializer(conf: SparkConf)
 kryo
   }
 
+  override def setDefaultClassLoader(classLoader: ClassLoader): Serializer 
= {
+defaultClassLoader = Some(classLoader)
--- End diff --

defaultClassLoader is used in newKryo.

I called `getPool` after setting the defaultClassLoader to make sure we 
don't accidently create a newKryo before the defaultClassLoader is updated. 
Setting it on 105 was because I don't believe `setDefaultClassLoader` is 
required to be called.

The issue that I unfortunately didn't notice until these tests failed is 
that the tests specify that you can `setDefaultClassLoader` after serializing 
an object (I'm not sure this functionality is actually used), causing an 
"incorrect" Kryo instance to be in the pool. Unfortunately the pool doesn't 
expose a way to clear itself out, thus the var, which clearly doesn't work.

I will work on a solution


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-07 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r231554649
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,20 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
+override def create: Kryo = {
+  newKryo()
+}
+  }
+
+  @transient
--- End diff --

Yes, you are correct, I don't think this will work


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

2018-11-06 Thread patrickbrownsync
Github user patrickbrownsync commented on the issue:

https://github.com/apache/spark/pull/22855
  
@srowen I went ahead and fixed those issues where I could find them, mostly 
for my own learning experience. Thanks for taking the time to review this many 
times, really appreciate it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

2018-11-06 Thread patrickbrownsync
Github user patrickbrownsync commented on the issue:

https://github.com/apache/spark/pull/22855
  
@srowen should be fixed now, I rebased off master and changed 
KryoSerializerBenchmark to conform to how BenchmarkBase had changed on master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-02 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r230422530
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
   private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
+  private val usePool = conf.getBoolean("spark.kryo.pool", false)
--- End diff --

Sounds good and done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-02 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r230420266
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
   private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
+  private val usePool = conf.getBoolean("spark.kryo.pool", false)
--- End diff --

Okay, if I'm understanding you correctly I should set the default to true, 
and remove the documentation. I will go ahead and do that, if I misunderstood 
let me know. Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22883: [SPARK-25837] [Core] Fix potential slowdown in AppStatus...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on the issue:

https://github.com/apache/spark/pull/22883
  
I would be happy to start thinking about that.

Can you provide more details on what you mean by data loss on write in the 
InMemoryStore?

I now see that the types being inserted have an annotation around which 
fields they are allowed to be indexed by, but I don't have a good idea of how 
you would allow multiple indexes in a collection short of running something 
like SQLite in memory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on the issue:

https://github.com/apache/spark/pull/22855
  
@srowen Thanks for taking the time to give me feedback! I'll get this stuff 
fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229763892
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
   private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
+  private val usePool = conf.getBoolean("spark.kryo.pool", false)
--- End diff --

I  had already added this to docs/configuration.md should I remove it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229763610
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
 
   // Regression test for SPARK-7766, an issue where disabling auto-reset 
and enabling
   // reference-tracking would lead to corrupted output when serializer 
instances are re-used
-  for (referenceTracking <- Set(true, false); autoReset <- Set(true, 
false)) {
-test(s"instance reuse with autoReset = $autoReset, referenceTracking = 
$referenceTracking") {
-  testSerializerInstanceReuse(autoReset = autoReset, referenceTracking 
= referenceTracking)
+  for {
+referenceTracking <- Set(true, false)
--- End diff --

Might as well fix while I'm here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229762873
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
--- End diff --

Thanks, I'll remove, I had a problem previously but it looks like the 
@transient on the pool is enough itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229762474
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
+override def create: Kryo = {
+  newKryo()
+}
+  }
+
+  @transient
+  lazy val pool = new KryoPool.Builder(factory).softReferences.build
--- End diff --

This is used directly by KryoSerializerInstance


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22883: [SPARK-25837] [Core] Fix potential slowdown in Ap...

2018-10-30 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22883#discussion_r229539016
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -1105,6 +1095,15 @@ private[spark] class AppStatusListener(
 
   cleanupCachedQuantiles(key)
 }
+
+// Delete tasks for all stages in one pass, as deleting them for each 
stage individually is slow
--- End diff --

Sure

Take a look at the implementation of InMemoryView at 
spark/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 line 179

specifically the implementation of iterator on line 193, here is an excerpt:

```
Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter));
Stream stream = sorted.stream();

if (first != null) {
stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
}

if (last != null) {
stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
}
```

and the original, in loop deletion code:

```
val tasks = kvstore.view(classOf[TaskDataWrapper])  
.index("stage") 
.first(key) 
.last(key)  
.asScala
  
tasks.foreach { t =>
kvstore.delete(t.getClass(), t.taskId)  
}
```

So you can see, if we do this each loop we actually sort the whole 
collection of TaskDataWrapper which are currently in the store, then go through 
and check each item based on the key set (the stage). Assuming we have a large 
number of stages and tasks this is an O(n^2) operation, which is what happens 
on my production application and the repro code.

If we do this in one pass for all stages, we only sort and iterate the list 
of tasks one time.

This same pattern happens fairly frequently using the KVStoreView interface 
and InMemoryView implementation. Since I am new to contributing to Spark I did 
not undertake a massive refactor, but I would suggest that this interface and 
implementation should be looked at and re-designed with efficiency in mind. The 
current implementation favors flexibility in terms of how the dataset is sorted 
and filtered, but enforcing a single sort order via something like a SortedSet 
would hopefully make it clear when the operation being performed was 
efficiently searching inside the collection, and when you were using an 
inefficient access pattern.

I hope that explains the reasoning, if you have any more questions let me 
know.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22883: [SPARK-25837] [Core] Fix potential slowdown in Ap...

2018-10-29 Thread patrickbrownsync
GitHub user patrickbrownsync opened a pull request:

https://github.com/apache/spark/pull/22883

[SPARK-25837] [Core] Fix potential slowdown in AppStatusListener when 
cleaning up stages

## What changes were proposed in this pull request?

* Update `AppStatusListener` `cleanupStages` method to remove tasks for 
those stages in a single pass instead of 1 for each stage.
* This fixes an issue where the cleanupStages method would get backed up, 
causing a backup in the executor in ElementTrackingStore, resulting in stages 
and jobs not getting cleaned up properly.

Tasks seem most susceptible to this as there are a lot of them, however a 
similar issue could arise in other locations the `KVStore` `view` method is 
used. A broader fix might involve updates to `KVStoreView` and `InMemoryView` 
as it appears this interface and implementation can lead to multiple and 
inefficient traversals of the stored data.

## How was this patch tested?

Using existing tests in AppStatusListenerSuite


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Blyncs/spark cleanup-stages-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22883.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22883


commit 4bfdac025ff0906316cf7697933a7b374ae3b427
Author: Patrick Brown 
Date:   2018-10-29T19:49:50Z

Update cleanupStages in AppStatusListener to delete tasks for all stages in 
a single pass

commit 178f7c3bf82f93177fce086037ece6ebf09bb350
Author: Patrick Brown 
Date:   2018-10-29T19:55:38Z

remove uneeded type




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-29 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228970250
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -298,30 +312,40 @@ class KryoDeserializationStream(
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: 
Boolean)
+private[spark] class KryoSerializerInstance(
+   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
   extends SerializerInstance {
   /**
* A re-used [[Kryo]] instance. Methods will borrow this instance by 
calling `borrowKryo()`, do
* their work, then release the instance by calling `releaseKryo()`. 
Logically, this is a caching
* pool of size one. SerializerInstances are not thread-safe, hence 
accesses to this field are
* not synchronized.
*/
-  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
+  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else 
borrowKryo()
 
   /**
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a 
cached Kryo instance;
* otherwise, it allocates a new instance.
*/
   private[serializer] def borrowKryo(): Kryo = {
-if (cachedKryo != null) {
-  val kryo = cachedKryo
-  // As a defensive measure, call reset() to clear any Kryo state that 
might have been modified
-  // by the last operation to borrow this instance (see SPARK-7766 for 
discussion of this issue)
+if (usePool) {
+  val kryo = ks.pool.borrow()
   kryo.reset()
-  cachedKryo = null
   kryo
 } else {
-  ks.newKryo()
+  if (cachedKryo != null) {
+val kryo = cachedKryo
+/**
+* As a defensive measure, call reset() to clear any Kryo state 
that might have
--- End diff --

Thanks fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-28 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228759466
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -298,30 +312,40 @@ class KryoDeserializationStream(
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: 
Boolean)
+private[spark] class KryoSerializerInstance(
+   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
   extends SerializerInstance {
   /**
* A re-used [[Kryo]] instance. Methods will borrow this instance by 
calling `borrowKryo()`, do
* their work, then release the instance by calling `releaseKryo()`. 
Logically, this is a caching
* pool of size one. SerializerInstances are not thread-safe, hence 
accesses to this field are
* not synchronized.
*/
-  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
+  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else 
borrowKryo()
 
   /**
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a 
cached Kryo instance;
* otherwise, it allocates a new instance.
*/
   private[serializer] def borrowKryo(): Kryo = {
-if (cachedKryo != null) {
-  val kryo = cachedKryo
-  // As a defensive measure, call reset() to clear any Kryo state that 
might have been modified
-  // by the last operation to borrow this instance (see SPARK-7766 for 
discussion of this issue)
+if (usePool) {
+  val kryo = ks.pool.borrow()
   kryo.reset()
-  cachedKryo = null
   kryo
 } else {
-  ks.newKryo()
+  if (cachedKryo != null) {
+val kryo = cachedKryo
+/**
+* As a defensive measure, call reset() to clear any Kryo state 
that might have
--- End diff --

Sorry I'm new to this, what is the specific style issue here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-28 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228759425
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala 
---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import scala.concurrent._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.serializer.KryoTest._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Benchmark for KryoPool vs old "pool of 1".
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *  bin/spark-submit --class  --jars 
+ *   2. build/sbt "core/test:runMain "
+ *   3. generate result:
+ *  SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain 
"
+ *  Results will be written to 
"benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoSerializerBenchmark extends BenchmarkBase {
+
+  var sc: SparkContext = null
+  val N = 500
+  override def runBenchmarkSuite(): Unit = {
+val name = "Benchmark KryoPool vs old\"pool of 1\" implementation"
+runBenchmark(name) {
+  val benchmark = new Benchmark(name, N, 10, output = output)
+  Seq(true, false).foreach(usePool => run(usePool, benchmark))
+  benchmark.run()
+}
+  }
+
+  private def run(usePool: Boolean, benchmark: Benchmark): Unit = {
+lazy val sc = createSparkContext(usePool)
+
+benchmark.addCase(s"KryoPool:$usePool") { _ =>
+  val futures = for (_ <- 0 until N) yield {
+Future {
+  sc.parallelize(0 until 10).map(i => i + 1).count()
+}
+  }
+
+  val future = Future.sequence(futures)
+
+  ThreadUtils.awaitResult(future, 10.minutes)
+}
+  }
+
+  def createSparkContext(usePool: Boolean): SparkContext = {
--- End diff --

I'm not sure I understand the question here, this benchmark class doesn't 
inherit from `SqlBasedBenchmark` it inherits from `BenchmarkBase` which has no 
`getSparkSession` method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-26 Thread patrickbrownsync
GitHub user patrickbrownsync opened a pull request:

https://github.com/apache/spark/pull/22855

[SPARK-25839] [Core] Implement use of KryoPool in KryoSerializer

## What changes were proposed in this pull request?

* Implement (optional) use of KryoPool in KryoSerializer, an alternative to 
the existing implementation of caching a Kryo instance inside 
KryoSerializerInstance
* Add config key & documentation of spark.kryo.pool in order to turn this on
* Add benchmark KryoSerializerBenchmark to compare new and old 
implementation
* Add results of benchmark

## How was this patch tested?

Added new tests inside KryoSerializerSuite to test the pool implementation 
as well as added the pool option to the existing regression testing for 
SPARK-7766


This is my original work and I license the work to the project under the 
project’s open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Blyncs/spark kryo-pool

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22855.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22855


commit ce5d13e0673f8574b5795d0d3df59da03118038a
Author: Patrick Brown 
Date:   2018-04-06T18:19:52Z

use kryo pool for kryo serializer

commit a4ba88eed6d18d2df5fab609bc28210df9b5a716
Author: Patrick Brown 
Date:   2018-04-09T21:20:16Z

fix pool serializable issue

commit 3f1c41ccc451af868a13616065676a5667d597fa
Author: Patrick Brown 
Date:   2018-10-26T18:52:07Z

Add option to KryoSerializer to use new KryoPool based implentation
Add tests for new implementation to KryoSerializerSuite.scala
Add benchmark new vs old implemtation in KryoSerializerBenchmark.scala
Add option in Benchmark base for afterAll() shutdown code to facilitate 
clean benchmark shutdown
Add documentation for spark.kryo.pool configuration option




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org