git commit: [SPARK] Fix NPE for ExternalAppendOnlyMap

2014-07-03 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master 3bbeca648 - c48053773


[SPARK] Fix NPE for ExternalAppendOnlyMap

It did not handle null keys very gracefully before.

Author: Andrew Or andrewo...@gmail.com

Closes #1288 from andrewor14/fix-external and squashes the following commits:

312b8d8 [Andrew Or] Abstract key hash code
ed5adf9 [Andrew Or] Fix NPE for ExternalAppendOnlyMap


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4805377
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4805377
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4805377

Branch: refs/heads/master
Commit: c480537739f9329ebfd580f09c69778e6c976366
Parents: 3bbeca6
Author: Andrew Or andrewo...@gmail.com
Authored: Thu Jul 3 10:26:50 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Thu Jul 3 10:26:50 2014 -0700

--
 .../util/collection/ExternalAppendOnlyMap.scala | 30 ++--
 .../collection/ExternalAppendOnlyMapSuite.scala | 27 --
 2 files changed, 46 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c4805377/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 288badd..292d096 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -252,7 +252,7 @@ class ExternalAppendOnlyMap[K, V, C](
   if (it.hasNext) {
 var kc = it.next()
 kcPairs += kc
-val minHash = kc._1.hashCode()
+val minHash = getKeyHashCode(kc)
 while (it.hasNext  it.head._1.hashCode() == minHash) {
   kc = it.next()
   kcPairs += kc
@@ -294,8 +294,9 @@ class ExternalAppendOnlyMap[K, V, C](
   // Select a key from the StreamBuffer that holds the lowest key hash
   val minBuffer = mergeHeap.dequeue()
   val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
-  var (minKey, minCombiner) = minPairs.remove(0)
-  assert(minKey.hashCode() == minHash)
+  val minPair = minPairs.remove(0)
+  var (minKey, minCombiner) = minPair
+  assert(getKeyHashCode(minPair) == minHash)
 
   // For all other streams that may have this key (i.e. have the same 
minimum key hash),
   // merge in the corresponding value (if any) from that stream
@@ -327,15 +328,16 @@ class ExternalAppendOnlyMap[K, V, C](
  * StreamBuffers are ordered by the minimum key hash found across all of 
their own pairs.
  */
 private class StreamBuffer(
-val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
+val iterator: BufferedIterator[(K, C)],
+val pairs: ArrayBuffer[(K, C)])
   extends Comparable[StreamBuffer] {
 
   def isEmpty = pairs.length == 0
 
   // Invalid if there are no more pairs in this stream
-  def minKeyHash = {
+  def minKeyHash: Int = {
 assert(pairs.length  0)
-pairs.head._1.hashCode()
+getKeyHashCode(pairs.head)
   }
 
   override def compareTo(other: StreamBuffer): Int = {
@@ -422,10 +424,22 @@ class ExternalAppendOnlyMap[K, V, C](
 }
 
 private[spark] object ExternalAppendOnlyMap {
+
+  /**
+   * Return the key hash code of the given (key, combiner) pair.
+   * If the key is null, return a special hash code.
+   */
+  private def getKeyHashCode[K, C](kc: (K, C)): Int = {
+if (kc._1 == null) 0 else kc._1.hashCode()
+  }
+
+  /**
+   * A comparator for (key, combiner) pairs based on their key hash codes.
+   */
   private class KCComparator[K, C] extends Comparator[(K, C)] {
 def compare(kc1: (K, C), kc2: (K, C)): Int = {
-  val hash1 = kc1._1.hashCode()
-  val hash2 = kc2._1.hashCode()
+  val hash1 = getKeyHashCode(kc1)
+  val hash2 = getKeyHashCode(kc2)
   if (hash1  hash2) -1 else if (hash1 == hash2) 0 else 1
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c4805377/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index deb7809..4288229 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ 

git commit: [SPARK] Fix NPE for ExternalAppendOnlyMap

2014-07-03 Thread adav
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 87b74a9bf - fdee6ee06


[SPARK] Fix NPE for ExternalAppendOnlyMap

It did not handle null keys very gracefully before.

Author: Andrew Or andrewo...@gmail.com

Closes #1288 from andrewor14/fix-external and squashes the following commits:

312b8d8 [Andrew Or] Abstract key hash code
ed5adf9 [Andrew Or] Fix NPE for ExternalAppendOnlyMap

(cherry picked from commit c480537739f9329ebfd580f09c69778e6c976366)
Signed-off-by: Aaron Davidson aa...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdee6ee0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdee6ee0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdee6ee0

Branch: refs/heads/branch-1.0
Commit: fdee6ee0655f04e9a0d3a66f2e8df5486a5ea032
Parents: 87b74a9
Author: Andrew Or andrewo...@gmail.com
Authored: Thu Jul 3 10:26:50 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Thu Jul 3 10:28:06 2014 -0700

--
 .../util/collection/ExternalAppendOnlyMap.scala | 30 ++--
 .../collection/ExternalAppendOnlyMapSuite.scala | 27 --
 2 files changed, 46 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fdee6ee0/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 288badd..292d096 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -252,7 +252,7 @@ class ExternalAppendOnlyMap[K, V, C](
   if (it.hasNext) {
 var kc = it.next()
 kcPairs += kc
-val minHash = kc._1.hashCode()
+val minHash = getKeyHashCode(kc)
 while (it.hasNext  it.head._1.hashCode() == minHash) {
   kc = it.next()
   kcPairs += kc
@@ -294,8 +294,9 @@ class ExternalAppendOnlyMap[K, V, C](
   // Select a key from the StreamBuffer that holds the lowest key hash
   val minBuffer = mergeHeap.dequeue()
   val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
-  var (minKey, minCombiner) = minPairs.remove(0)
-  assert(minKey.hashCode() == minHash)
+  val minPair = minPairs.remove(0)
+  var (minKey, minCombiner) = minPair
+  assert(getKeyHashCode(minPair) == minHash)
 
   // For all other streams that may have this key (i.e. have the same 
minimum key hash),
   // merge in the corresponding value (if any) from that stream
@@ -327,15 +328,16 @@ class ExternalAppendOnlyMap[K, V, C](
  * StreamBuffers are ordered by the minimum key hash found across all of 
their own pairs.
  */
 private class StreamBuffer(
-val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
+val iterator: BufferedIterator[(K, C)],
+val pairs: ArrayBuffer[(K, C)])
   extends Comparable[StreamBuffer] {
 
   def isEmpty = pairs.length == 0
 
   // Invalid if there are no more pairs in this stream
-  def minKeyHash = {
+  def minKeyHash: Int = {
 assert(pairs.length  0)
-pairs.head._1.hashCode()
+getKeyHashCode(pairs.head)
   }
 
   override def compareTo(other: StreamBuffer): Int = {
@@ -422,10 +424,22 @@ class ExternalAppendOnlyMap[K, V, C](
 }
 
 private[spark] object ExternalAppendOnlyMap {
+
+  /**
+   * Return the key hash code of the given (key, combiner) pair.
+   * If the key is null, return a special hash code.
+   */
+  private def getKeyHashCode[K, C](kc: (K, C)): Int = {
+if (kc._1 == null) 0 else kc._1.hashCode()
+  }
+
+  /**
+   * A comparator for (key, combiner) pairs based on their key hash codes.
+   */
   private class KCComparator[K, C] extends Comparator[(K, C)] {
 def compare(kc1: (K, C), kc2: (K, C)): Int = {
-  val hash1 = kc1._1.hashCode()
-  val hash2 = kc2._1.hashCode()
+  val hash1 = getKeyHashCode(kc1)
+  val hash2 = getKeyHashCode(kc2)
   if (hash1  hash2) -1 else if (hash1 == hash2) 0 else 1
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdee6ee0/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index deb7809..4288229 100644
---