[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16867


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-23 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r107812908
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
--- End diff --

super nit: can you combine these into one import (import java.util.{Arrays, 
NoSuchElementException})


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-23 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r107812876
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+intercept[NoSuchElementException] {
+  medianHeap.median
+}
+  }
+
+  test("Median should be correct when size of MedianHeap is even") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
--- End diff --

instead of indexing into the array, I think it would be clearer here to 
just hard-code 4.5 (it's easier to see that the median is 4.5 than to reason 
about the indices in the array)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-23 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r107812807
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+intercept[NoSuchElementException] {
+  medianHeap.median
+}
+  }
+
+  test("Median should be correct when size of MedianHeap is even") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
+
+  test("Median should be correct when size of MedianHeap is odd") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 9)
+assert(medianHeap.median === (array(4)))
+  }
+
+  test("Median should be correct though there are duplicated numbers 
inside.") {
+val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4)
--- End diff --

Can you change this to something like:

Array(0, 0, 1, 1, 2, 3, 4)?

Otherwise the median heap could be handling the duplicates wrong (e.g., by 
not actually inserting duplicates), and the assertion at the bottom would still 
old. Then the check at the end can be `medianHeap.median === 1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-23 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r107812986
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+intercept[NoSuchElementException] {
+  medianHeap.median
+}
+  }
+
+  test("Median should be correct when size of MedianHeap is even") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
+
+  test("Median should be correct when size of MedianHeap is odd") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 9)
+assert(medianHeap.median === (array(4)))
+  }
+
+  test("Median should be correct though there are duplicated numbers 
inside.") {
+val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
+
+  test("Median should be correct when skew situations.") {
--- End diff --

"when skew situations" --> "when input data is skewed"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-23 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r107812930
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+intercept[NoSuchElementException] {
+  medianHeap.median
+}
+  }
+
+  test("Median should be correct when size of MedianHeap is even") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
+
+  test("Median should be correct when size of MedianHeap is odd") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 9)
+assert(medianHeap.median === (array(4)))
--- End diff --

similarly here -- just `medianHeap.median === 4`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-23 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r107812584
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -893,6 +893,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 val taskSet = FakeTask.createTaskSet(4)
 // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
 sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation", "true")
--- End diff --

Ohhh cool that makes sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106833748
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.PriorityQueue
+
+/**
+ * MedianHeap is designed to be used to quickly track the median of a 
group of numbers
+ * that may contain duplicates. Inserting a new number has O(log n) time 
complexity and
+ * determining the median has O(1) time complexity.
+ * The basic idea is to maintain two heaps: a smallerHalf and a 
largerHalf. The smallerHalf
+ * stores the smaller half of all numbers while the largerHalf stores the 
larger half.
+ * The sizes of two heaps need to be balanced each time when a new number 
is inserted so
+ * that their sizes will not be different by more than 1. Therefore each 
time when
+ * findMedian() is called we check if two heaps have the same size. If 
they do, we should
+ * return the average of the two top values of heaps. Otherwise we return 
the top of the
+ * heap which has one more element.
+ */
+
--- End diff --

nit: delete this blank line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106833737
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.PriorityQueue
+
+/**
+ * MedianHeap is designed to be used to quickly track the median of a 
group of numbers
+ * that may contain duplicates. Inserting a new number has O(log n) time 
complexity and
+ * determining the median has O(1) time complexity.
+ * The basic idea is to maintain two heaps: a smallerHalf and a 
largerHalf. The smallerHalf
+ * stores the smaller half of all numbers while the largerHalf stores the 
larger half.
+ * The sizes of two heaps need to be balanced each time when a new number 
is inserted so
+ * that their sizes will not be different by more than 1. Therefore each 
time when
+ * findMedian() is called we check if two heaps have the same size. If 
they do, we should
+ * return the average of the two top values of heaps. Otherwise we return 
the top of the
+ * heap which has one more element.
+ */
+
+private[spark] class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a smallerHalf,
+  // i.e median is the maximum, at the root
+  private[this] var smallerHalf = PriorityQueue.empty[Double](ord)
--- End diff --

very minor -- could you make this comment a doc with `/**`?  Even though 
its private, I find that helpful as that is useful in IDEs where they'll show 
this text w/ a hover on a reference


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106833868
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.PriorityQueue
+
+/**
+ * MedianHeap is designed to be used to quickly track the median of a 
group of numbers
+ * that may contain duplicates. Inserting a new number has O(log n) time 
complexity and
+ * determining the median has O(1) time complexity.
+ * The basic idea is to maintain two heaps: a smallerHalf and a 
largerHalf. The smallerHalf
+ * stores the smaller half of all numbers while the largerHalf stores the 
larger half.
+ * The sizes of two heaps need to be balanced each time when a new number 
is inserted so
+ * that their sizes will not be different by more than 1. Therefore each 
time when
+ * findMedian() is called we check if two heaps have the same size. If 
they do, we should
+ * return the average of the two top values of heaps. Otherwise we return 
the top of the
+ * heap which has one more element.
+ */
+
+private[spark] class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a smallerHalf,
+  // i.e median is the maximum, at the root
+  private[this] var smallerHalf = PriorityQueue.empty[Double](ord)
+
+  // Stores all the numbers greater than the current median in a 
largerHalf,
+  // i.e median is the minimum, at the root
+  private[this] var largerHalf = PriorityQueue.empty[Double](ord.reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+smallerHalf.isEmpty && largerHalf.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+smallerHalf.size + largerHalf.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the largerHalf.
+if (isEmpty) {
+  largerHalf.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into largerHalf,
+  // otherwise smallerHalf.
+  if (x > median) {
+largerHalf.enqueue(x)
+  } else {
+smallerHalf.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (largerHalf.size - smallerHalf.size > 1) {
+  smallerHalf.enqueue(largerHalf.dequeue())
+}
+if (smallerHalf.size - largerHalf.size > 1) {
+  largerHalf.enqueue(smallerHalf.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def median: Double = {
--- End diff --

minor: I find comments which basically just restate the method name to be 
pretty pointless.  I'd only include them if they add something else, eg. 
preconditions, or complexity, etc.  Mostly I'd say they're not necessary for 
any of the methods here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106453502
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.median
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is even") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
+
+  test("Median should be correct when size of MedianHeap is odd") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 9)
+assert(medianHeap.median === (array(4)))
+  }
+
+  test("Size of Median should be correct though there are duplicated 
numbers inside.") {
+val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+Arrays.sort(array)
+assert(medianHeap.size === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
--- End diff --

Yes, I added this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106453205
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.median
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
--- End diff --

Thanks a lot for the recommendation :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106433116
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.median
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
--- End diff --

scalatest has a simpler pattern for this:

```scala
intercept[NoSuchElementException] {
  medianHeap.median
}
```

http://www.scalatest.org/user_guide/using_assertions

(I guess you could use `assertThrows` in this case, but I tend to always 
use `intercept` since it also lets you inspect the thrown exception.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106436689
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.median
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is even") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
+
+  test("Median should be correct when size of MedianHeap is odd") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 9)
+assert(medianHeap.median === (array(4)))
+  }
+
+  test("Size of Median should be correct though there are duplicated 
numbers inside.") {
+val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+Arrays.sort(array)
+assert(medianHeap.size === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
--- End diff --

I know Kay asked for tests with a some hardcoded data, but I think these 
tests are too simplistic.  All of these tests insert data in order, and none 
have significant skew.

Can you add a test case which does something like:
1) inserts 10 elements with the same value (eg. 5), check the median
2) insert 100 elements with a larger value (eg 10) check the median
3) insert 1000 elements with an even smaller value (eg 0), check the median



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106434806
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.median
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is even") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
+
+  test("Median should be correct when size of MedianHeap is odd") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 9)
+assert(medianHeap.median === (array(4)))
+  }
+
+  test("Size of Median should be correct though there are duplicated 
numbers inside.") {
--- End diff --

I'd remove "Size of" from the name, this is testing more than the size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106433273
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
 
 if (!isLocal && conf.getBoolean("spark.speculation", false)) {
   logInfo("Starting speculative execution thread")
-  speculationScheduler.scheduleAtFixedRate(new Runnable {
+  speculationScheduler.scheduleWithFixedDelay(new Runnable {
--- End diff --

@squito 
Thanks a lot for looking into this. I will put the change in another pr : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106421711
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
 
 if (!isLocal && conf.getBoolean("spark.speculation", false)) {
   logInfo("Starting speculative execution thread")
-  speculationScheduler.scheduleAtFixedRate(new Runnable {
+  speculationScheduler.scheduleWithFixedDelay(new Runnable {
--- End diff --

I think you are right, this is a good change.  Its somewhat related to 
other changes here, so I personally don't feel too strongly about needing to 
put it in its own pr.

I would say that if its in this pr, the change description should be 
updated to mention this as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106340513
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -893,6 +893,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 val taskSet = FakeTask.createTaskSet(4)
 // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
 sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation", "true")
--- End diff --

This should be set. Because the duration is inserted to `MedianHeap` only 
when `spark.speculation`(e.g. If I remove this, `MedianHeap` will be empty when 
call `checkSpeculatableTasks`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106340321
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
 
 if (!isLocal && conf.getBoolean("spark.speculation", false)) {
   logInfo("Starting speculative execution thread")
-  speculationScheduler.scheduleAtFixedRate(new Runnable {
+  speculationScheduler.scheduleWithFixedDelay(new Runnable {
--- End diff --

I was thinking `checkSpeculatableTasks` will synchronize 
`TaskSchedulerImpl`. If `checkSpeculatableTasks` doesn't finish with 100ms, 
then the possibility exists for that thread to release and then immediately 
re-acquire the lock. Should this be included in this pr? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106235234
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -911,17 +919,16 @@ private[spark] class TaskSetManager(
   override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean 
= {
 // Can't speculate if we only have one task, and no need to speculate 
if the task set is a
 // zombie.
-if (isZombie || numTasks == 1) {
+if (isZombie || numTasks == 1 || !speculationEnabled) {
--- End diff --

I don't think you need this change -- whether speculation is enabled is 
checked before calling this function 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L177)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106237592
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
--- End diff --

import PriorityQueue directly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106229705
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
 
 if (!isLocal && conf.getBoolean("spark.speculation", false)) {
   logInfo("Starting speculative execution thread")
-  speculationScheduler.scheduleAtFixedRate(new Runnable {
+  speculationScheduler.scheduleWithFixedDelay(new Runnable {
--- End diff --

What's the reason for this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106238544
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.findMedian()
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is even or odd") {
+val random = new Random()
+val medianHeap1 = new MedianHeap()
+val array1 = new Array[Int](100)
+(0 until 100).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap1.insert(randomNumber)
+array1(i) += randomNumber
+}
+util.Arrays.sort(array1)
--- End diff --

Can you break this into two tests -- one for when even and one for when odd?

Also, can you manually pass in a small list of numbers, rather than using 
random?  I don't see a big benefit of using random here and the test is easier 
to debug when it's deterministic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106237464
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -893,6 +893,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 val taskSet = FakeTask.createTaskSet(4)
 // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
 sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation", "true")
--- End diff --

you can remove this once you make the change I suggested above to eliminate 
the (redundant) check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106237709
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util
--- End diff --

import Arrays rather than the more general util


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106236633
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap inserts number by O(log n) and returns the median by O(1) 
time complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark] class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  private[this] var maxHeap = mutable.PriorityQueue.empty[Double](ord)
--- End diff --

how about calling this `smallerHalf` and calling the other one 
`largerHalf`? I think that would make the other code slightly easier to read 
(otherwise it's confusing that "minHeap" actually refers to the larger half of 
the numbers)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106238777
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.findMedian()
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is ord or even") {
+val random = new Random()
+val medianHeap1 = new MedianHeap()
+val array1 = new Array[Int](100)
+(0 until 100).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap1.insert(randomNumber)
+array1(i) += randomNumber
+}
+util.Arrays.sort(array1)
+assert(medianHeap1.findMedian() === ((array1(49) + array1(50)) / 2.0))
+
+val medianHeap2 = new MedianHeap()
+val array2 = new Array[Int](101)
+(0 until 101).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap2.insert(randomNumber)
+array2(i) += randomNumber
+}
+util.Arrays.sort(array2)
+assert(medianHeap2.findMedian() === array2(50))
+  }
+
+  test("Size of Median should be correct though there are duplicated 
numbers inside.") {
--- End diff --

Similar to my comment above, can you just pass in a manually created list 
of durations here (e.g., (1, 1, 2)) where it's very obvious that there are 
duplicates, and what the median is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106235943
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap inserts number by O(log n) and returns the median by O(1) 
time complexity.
--- End diff --

Can you change this first line to something like:

MedianHeap is designed to be used to quickly track the median of a group of 
numbers that may contain duplicates.  Inserting a new number has O(log n) time 
complexity and determining the median has O(1) time complexity.
(newline)

and then have the more detailed implementation description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106229967
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -141,6 +143,9 @@ private[spark] class TaskSetManager(
   // Task index, start and finish time for each task attempt (indexed by 
task ID)
   val taskInfos = new HashMap[Long, TaskInfo]
 
+  // Use a MedianHeap to record durations of successful tasks when 
speculation is enabled.
--- End diff --

can you expand on this a bit to something like:

// Use a MedianHeap to record durations of successful tasks so we know when 
to launch speculative tasks.
// This is only used when speculation is enabled, to avoid the overhead of 
inserting into the heap when the
// heap won't be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106237170
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap inserts number by O(log n) and returns the median by O(1) 
time complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark] class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  private[this] var maxHeap = mutable.PriorityQueue.empty[Double](ord)
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  private[this] var minHeap = 
mutable.PriorityQueue.empty[Double](ord.reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
--- End diff --

can you just call this median, and declare it without parents (since it 
doesn't modify internal state)?  I think that helps make it more obvious that 
this method is very inexpensive 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r105751160
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -909,19 +917,20 @@ private[spark] class TaskSetManager(
*
*/
   override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean 
= {
+
--- End diff --

nit: don't add extra blank line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r105754479
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
--- End diff --

nit: I think `private[spark]` will fit on the same line.

Also can you update the doc to mention that insertion is `O(log n)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r105753056
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  private[this] var maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  private[this] var minHeap = mutable.PriorityQueue.empty[Double](
+implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Remove an number from MedianHeap, return if the number exists.
+  def remove(x: Double): Boolean = {
--- End diff --

since we're not using `remove` anymore, can you get rid of this method? 
(and the test?)  The code may be fine, but its certainly somewhat confusing, 
would rather avoid thinking about it if I can ... :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r105751503
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  private[this] var maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
--- End diff --

can't this just be `ord` instead of `implicitly[Ordering[Double]]`?  that 
would be simpler


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r105751547
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  private[this] var maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  private[this] var minHeap = mutable.PriorityQueue.empty[Double](
+implicitly[Ordering[Double]].reverse)
--- End diff --

same here, `ord.reverse`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104524384
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

On second thought, since MedianHeap is being modeled as a general data 
structure potentially reused (and not an impl detail in tsm), might be good to 
average and ensure consistency with definition. That might be less of a 
surprise in future use of api, compared to knowing that duration applied for 
speculative exec debugging is not exact.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104520859
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -740,6 +743,7 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
+successfulTaskDurations.remove(taskInfos(tid).duration)
--- End diff --

@kayousterhout, @squito  That would be a workaround, yes : at the expense 
of divergent data structures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104520626
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

I was proposing to keep the behavior consistent with what we had earlier - 
iirc we picked an actual duration right ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104519160
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

So what are you proposing here?  I'm not convinced that the ability to map 
the task length threshold for speculation to a particular task (especially in a 
job with thousands of tasks) is useful enough to merit having a `median` 
function that returns something that's not technically the median.  I don't 
doubt that the speculation code path is easy to debug -- but it seems like 
there are better ways to improve the logging around that than to have an 
incorrect median implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104518016
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -740,6 +743,7 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
+successfulTaskDurations.remove(taskInfos(tid).duration)
--- End diff --

so can't we just change the line below back to `if (tasksSuccessful >= 
minFinishedForSpeculation && tasksSuccessful > 0) {` and still get rid of the 
call to `remove`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104517879
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -740,6 +743,7 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
+successfulTaskDurations.remove(taskInfos(tid).duration)
--- End diff --

Ah I see you're saying that the problem is that the check that enough tasks 
have finished to enable speculation is no longer correct?  If that's the sole 
issue, why don't we just keep using `tasksSuccessful` for that check rather 
than looking at the count of items in the heap?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104517384
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

Wont that not be very verbose for large number of tasks ?
I have rarely had to debug spec exec - but when I had to, it was not easy !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104516899
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -740,6 +743,7 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
+successfulTaskDurations.remove(taskInfos(tid).duration)
--- End diff --

Consider a scenario like this, particularly with pre-emption happens :
* 10k tasks to be run, 9.5k finish on 500 executors - speculative execution 
triggered for remaining tasks.
* Cluster load spike, 400 or the 500 executors reclaimed.
* We will reexecute all the failed tasks - but those tasks are also now 
candidates for speculative execution since it will meet the constraints (after 
this PR).

In the scenario above, with existing code, the 500 spec tasks which are 
already running wont be stopped - but new ones wont be launched until they meet 
the criterion again.

You are right that the median duration computation itself is probably not 
invalid (it could, but we can ignore that complexity here) - what gets affected 
is determining when to (re-)enable speculative execution of tasks from the set, 
which gets affected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104515761
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

How about adding a debug log message in the TSM for when speculation is 
triggered that lists the threshold time and all completed task times to make 
this easier to reason about?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104515525
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -740,6 +743,7 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
+successfulTaskDurations.remove(taskInfos(tid).duration)
--- End diff --

You mean for shuffle map tasks, where the task completed successfully but 
then the executor was lost later, so we re-run the task?  That case seems OK to 
leave the task time in, because it's still a valid task completion time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104515191
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -740,6 +743,7 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
+successfulTaskDurations.remove(taskInfos(tid).duration)
--- End diff --

What about executor failure after task completion ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104515039
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

You are right, the strict definition requires us to average - it just makes
it difficult to reason based on logs at times when the duration mentioned
does not exist when there are discontinuities :-)
(currently, it is simple 2/3 * threshold if we want to narrow down spec
exec task triggers)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104514689
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -740,6 +743,7 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
+successfulTaskDurations.remove(taskInfos(tid).duration)
--- End diff --

@mridulm how would that happen?  When would the short times from the failed 
tasks be added to successfulTaskDurations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104513453
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -740,6 +743,7 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
+successfulTaskDurations.remove(taskInfos(tid).duration)
--- End diff --

When we have executor failures (or due to pre-emption), this can 
essentially cause a large number of sub-optimal (new) speculative tasks to be 
executed.
Handling tasks which had not succeeded can be handled by looking at 
taskInfo to see if it has completed already ? (Yes, an if condition is required 
before remove).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104508618
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -740,6 +743,7 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
+successfulTaskDurations.remove(taskInfos(tid).duration)
--- End diff --

I wouldn't bother with this, because the remove method is complicated (so 
you can eliminate it without this line of code), and the median is good because 
it's not very sensitive to outlier values, so I think it's fine to leave failed 
tasks in.  Also, I'm not sure this code is correct, because it's possible that 
the duration hadn't yet been added to successfulTaskDurations, in which case 
you could be removing the duration that happens to be the same, but actually 
corresponds to a successful task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-06 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104508178
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

@mridulm it seems like we should be using the traditional definition of the 
median here, as and not what you would prefer that the definition of the median 
was!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104352100
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

I prefer actual values and not synthetic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104352237
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -754,7 +743,6 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
-successfulTaskIdsSet -= tid
--- End diff --

Not handling task failure will cause other issues - we need to keep data 
structures consistent - cost is secondary to correctness for a performance PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104351990
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.findMedian()
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is ord or even") {
+val random = new Random()
+val medianHeap1 = new MedianHeap()
+val array1 = new Array[Int](100)
+(0 until 100).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap1.insert(randomNumber)
+array1(i) += randomNumber
+}
+util.Arrays.sort(array1)
+assert(medianHeap1.findMedian() === ((array1(49) + array1(50)) / 2.0))
+
+val medianHeap2 = new MedianHeap()
+val array2 = new Array[Int](101)
+(0 until 101).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap2.insert(randomNumber)
+array2(i) += randomNumber
+}
+util.Arrays.sort(array2)
+assert(medianHeap2.findMedian() === array2(50))
+  }
+
+  test("Size of Median should be correct though there are duplicated 
numbers inside.") {
--- End diff --

Yes, but no validation of the median is being done.
Either enhance this test to validate the media, or add a new test to do so 
(if this is to test only duplicated size)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104344524
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -754,7 +743,6 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
-successfulTaskIdsSet -= tid
--- End diff --

Removing duration from `successfulTaskDurations` is quite time 
consuming(O(n)) now. We just use `successfulTaskDurations` to generate the 
median duration. I might hesitate to do the remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104344529
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -754,7 +743,6 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
-successfulTaskIdsSet -= tid
--- End diff --

Removing duration from `successfulTaskDurations` is quite time 
consuming(O(n)) now. We just use `successfulTaskDurations` to generate the 
median duration. I might hesitate to do the remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104344274
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

I think average value is the definition of 'median' when there are even 
numbers. Maybe it's better to keep it as `(minHeap.head + maxHeap.head) / 2.0` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104344072
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.findMedian()
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is ord or even") {
+val random = new Random()
+val medianHeap1 = new MedianHeap()
+val array1 = new Array[Int](100)
+(0 until 100).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap1.insert(randomNumber)
+array1(i) += randomNumber
+}
+util.Arrays.sort(array1)
+assert(medianHeap1.findMedian() === ((array1(49) + array1(50)) / 2.0))
+
+val medianHeap2 = new MedianHeap()
+val array2 = new Array[Int](101)
+(0 until 101).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap2.insert(randomNumber)
+array2(i) += randomNumber
+}
+util.Arrays.sort(array2)
+assert(medianHeap2.findMedian() === array2(50))
+  }
+
+  test("Size of Median should be correct though there are duplicated 
numbers inside.") {
--- End diff --

random.nextInt(100) returns value between 0(inclusive) and the specified 
value(exclusive). If I call it 1000 times, there must be duplicates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104341778
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

why interpolate ? simply pick one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104341854
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -754,7 +743,6 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
-successfulTaskIdsSet -= tid
--- End diff --

It is necessary to re-add equivalent functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104341937
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.findMedian()
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is ord or even") {
--- End diff --

ord -> odd


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104342097
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.findMedian()
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is ord or even") {
+val random = new Random()
+val medianHeap1 = new MedianHeap()
+val array1 = new Array[Int](100)
+(0 until 100).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap1.insert(randomNumber)
+array1(i) += randomNumber
+}
+util.Arrays.sort(array1)
+assert(medianHeap1.findMedian() === ((array1(49) + array1(50)) / 2.0))
+
+val medianHeap2 = new MedianHeap()
+val array2 = new Array[Int](101)
+(0 until 101).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap2.insert(randomNumber)
+array2(i) += randomNumber
+}
+util.Arrays.sort(array2)
+assert(medianHeap2.findMedian() === array2(50))
+  }
+
+  test("Size of Median should be correct though there are duplicated 
numbers inside.") {
--- End diff --

Also add/enhance this to ensure median is correct even if duplicates exist. 
In test above, that is not gauranteed (it can be duplicate if random ends up 
generating duplicates).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-28 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r103516283
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -911,14 +916,14 @@ private[spark] class TaskSetManager(
 logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
 if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 
0) {
   val time = clock.getTimeMillis()
-  val durations = 
taskInfos.values.filter(_.successful).map(_.duration).toArray
-  Arrays.sort(durations)
+  val durations = successfulTasksSet.toArray.map(taskInfos(_).duration)
   val medianDuration = durations(min((0.5 * 
tasksSuccessful).round.toInt, durations.length - 1))
   val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 
minTimeToSpeculation)
   // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
   // bound based on that.
   logDebug("Task length threshold for speculation: " + threshold)
-  for ((tid, info) <- taskInfos) {
+  for (tid <- runningTasksSet) {
+val info = taskInfos(tid)
--- End diff --

@squito @kayousterhout @jinxing64 Just to add, the other change does look 
interesting - and I can definitely see potential value in it. Would be good to 
see actual impact of it.
For example, when I was running jobs with 200k - 400k tasks, this never 
came up (though probably my config's were different from yours) - would be good 
to see actual impact of the other changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-27 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r103391138
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -911,14 +916,14 @@ private[spark] class TaskSetManager(
 logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
 if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 
0) {
   val time = clock.getTimeMillis()
-  val durations = 
taskInfos.values.filter(_.successful).map(_.duration).toArray
-  Arrays.sort(durations)
+  val durations = successfulTasksSet.toArray.map(taskInfos(_).duration)
   val medianDuration = durations(min((0.5 * 
tasksSuccessful).round.toInt, durations.length - 1))
   val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 
minTimeToSpeculation)
   // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
   // bound based on that.
   logDebug("Task length threshold for speculation: " + threshold)
-  for ((tid, info) <- taskInfos) {
+  for (tid <- runningTasksSet) {
+val info = taskInfos(tid)
--- End diff --

@kayousterhout 
Thanks a lot for your comments :)
I will keep this simple change in this pr. For time complexity improvement, 
I will make another pr and try add some measurements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-27 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r103390199
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -911,14 +916,14 @@ private[spark] class TaskSetManager(
 logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
 if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 
0) {
   val time = clock.getTimeMillis()
-  val durations = 
taskInfos.values.filter(_.successful).map(_.duration).toArray
-  Arrays.sort(durations)
+  val durations = successfulTasksSet.toArray.map(taskInfos(_).duration)
   val medianDuration = durations(min((0.5 * 
tasksSuccessful).round.toInt, durations.length - 1))
   val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 
minTimeToSpeculation)
   // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
   // bound based on that.
   logDebug("Task length threshold for speculation: " + threshold)
-  for ((tid, info) <- taskInfos) {
+  for (tid <- runningTasksSet) {
+val info = taskInfos(tid)
--- End diff --

Echoing what Imran said -- I'm definitely +1 on merging this simple change. 
 The other changes in this PR add a bunch of complexity, so I'd need to see 
measurements demonstrating a significant improvement in performance to be 
convinced that we should merge them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r103258743
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -909,16 +921,17 @@ private[spark] class TaskSetManager(
 var foundTasks = false
 val minFinishedForSpeculation = (SPECULATION_QUANTILE * 
numTasks).floor.toInt
 logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
-if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 
0) {
+val successfulTaskIdsSize = successfulTaskIdsSet.size
+if (successfulTaskIdsSize >= minFinishedForSpeculation && 
successfulTaskIdsSize > 0) {
   val time = clock.getTimeMillis()
-  val durations = 
taskInfos.values.filter(_.successful).map(_.duration).toArray
-  Arrays.sort(durations)
-  val medianDuration = durations(min((0.5 * 
tasksSuccessful).round.toInt, durations.length - 1))
+  val medianDuration = successfulTaskIdsSet.slice(
+successfulTaskIdsSize / 2, successfulTaskIdsSize / 2 + 1).head
--- End diff --

.duration


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r102292628
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -137,6 +137,12 @@ private[spark] class TaskSetManager(
   // Task index, start and finish time for each task attempt (indexed by 
task ID)
   val taskInfos = new HashMap[Long, TaskInfo]
 
+  val successfulTasksSet = new scala.collection.mutable.TreeSet[Long] {
--- End diff --

this can't be a set, since multiple tasks might be running for the same 
amount of time.  you could change it to a set of `(time, count)` pairs, which 
would make the update logic a bit more complicated, or just keep a list of all 
runtimes.

also the name should indicate that its the durations, eg. 
`successfulTaskDurations`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r102292835
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -911,14 +916,14 @@ private[spark] class TaskSetManager(
 logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
 if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 
0) {
   val time = clock.getTimeMillis()
-  val durations = 
taskInfos.values.filter(_.successful).map(_.duration).toArray
-  Arrays.sort(durations)
+  val durations = successfulTasksSet.toArray.map(taskInfos(_).duration)
--- End diff --

there are some other things you could do to make the original code more 
efficient:
a) instead of `.values.filter.map`, use a `foreach` to directly add into an 
`ArrayBuffer`.  That will avoid all the intermediate collections that scala 
would create otherwise.
b) store an approximate distribution of the runtimes, eg. using a tdigest.

aside: what  pain that there is no quick way to get the middle element of a 
TreeSet -- I couldn't find anything efficient in either the java or scala libs 
:(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r102293021
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -911,14 +916,14 @@ private[spark] class TaskSetManager(
 logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
 if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 
0) {
   val time = clock.getTimeMillis()
-  val durations = 
taskInfos.values.filter(_.successful).map(_.duration).toArray
-  Arrays.sort(durations)
+  val durations = successfulTasksSet.toArray.map(taskInfos(_).duration)
   val medianDuration = durations(min((0.5 * 
tasksSuccessful).round.toInt, durations.length - 1))
   val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 
minTimeToSpeculation)
   // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
   // bound based on that.
   logDebug("Task length threshold for speculation: " + threshold)
-  for ((tid, info) <- taskInfos) {
+  for (tid <- runningTasksSet) {
+val info = taskInfos(tid)
--- End diff --

oh, good find, this alone looks like a worthwhile fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-08 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16867

[SPARK-16929] Improve performance when check speculatable tasks.

## What changes were proposed in this pull request?

When check speculatable tasks in `TaskSetManager`, current code scan all 
task infos and sort durations of successful tasks in O(NlogN) time complexity. 
Since during the checkin
g process, `TaskSchedulerImpl`'s synchronized lock is acquired, so it might 
cause performance degradation when check a large scale task set, say hundreds 
of thousands.

This change uses a `TreeSet` to cache the successful task infos and compare 
the median duration with running tasks, avoiding scanning all task infos.
## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-16929

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16867.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16867


commit 1169d118662a9bfdabe88238352fe834a28aee14
Author: jinxing 
Date:   2017-02-07T02:35:10Z

[SPARK-16929] Improve performance when check speculatable tasks.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org