spark git commit: [SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill

2015-09-10 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 49da38e5f -> e04811137


[SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill

Data Spill with UnsafeRow causes assert failure.

```
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
```

To reproduce that with code (thanks andrewor14):
```scala
bin/spark-shell --master local
  --conf spark.shuffle.memoryFraction=0.005
  --conf spark.shuffle.sort.bypassMergeThreshold=0

sc.parallelize(1 to 2 * 1000 * 1000, 10)
  .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count()
```

Author: Cheng Hao 

Closes #8635 from chenghao-intel/unsafe_spill.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0481113
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0481113
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0481113

Branch: refs/heads/master
Commit: e04811137680f937669cdcc78771227aeb7cd849
Parents: 49da38e
Author: Cheng Hao 
Authored: Thu Sep 10 11:48:43 2015 -0700
Committer: Andrew Or 
Committed: Thu Sep 10 11:48:43 2015 -0700

--
 .../spark/util/collection/ExternalSorter.scala  |  6 ++
 .../sql/execution/UnsafeRowSerializer.scala |  2 +-
 .../execution/UnsafeRowSerializerSuite.scala| 64 ++--
 3 files changed, 67 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e0481113/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 19287ed..138c05d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -188,6 +188,12 @@ private[spark] class ExternalSorter[K, V, C](
 
   private val spills = new ArrayBuffer[SpilledFile]
 
+  /**
+   * Number of files this sorter has spilled so far.
+   * Exposed for testing.
+   */
+  private[spark] def numSpills: Int = spills.size
+
   override def insertAll(records: Iterator[Product2[K, V]]): Unit = {
 // TODO: stop combining if we find that the reduction factor isn't high
 val shouldCombine = aggregator.isDefined

http://git-wip-us.apache.org/repos/asf/spark/blob/e0481113/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 5c18558..e060c06 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -72,7 +72,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) 
extends SerializerInst
 override def writeKey[T: ClassTag](key: T): SerializationStream = {
   // The key is only needed on the map side when 

spark git commit: [SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill

2015-09-10 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 5e06d41a4 -> bc70043c8


[SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill

Data Spill with UnsafeRow causes assert failure.

```
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
```

To reproduce that with code (thanks andrewor14):
```scala
bin/spark-shell --master local
  --conf spark.shuffle.memoryFraction=0.005
  --conf spark.shuffle.sort.bypassMergeThreshold=0

sc.parallelize(1 to 2 * 1000 * 1000, 10)
  .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count()
```

Author: Cheng Hao 

Closes #8635 from chenghao-intel/unsafe_spill.

(cherry picked from commit e04811137680f937669cdcc78771227aeb7cd849)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc70043c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc70043c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc70043c

Branch: refs/heads/branch-1.5
Commit: bc70043c8ebdc985ae4a02092b2750c22460d657
Parents: 5e06d41
Author: Cheng Hao 
Authored: Thu Sep 10 11:48:43 2015 -0700
Committer: Andrew Or 
Committed: Thu Sep 10 11:48:50 2015 -0700

--
 .../spark/util/collection/ExternalSorter.scala  |  6 ++
 .../sql/execution/UnsafeRowSerializer.scala |  2 +-
 .../execution/UnsafeRowSerializerSuite.scala| 64 ++--
 3 files changed, 67 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bc70043c/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 19287ed..138c05d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -188,6 +188,12 @@ private[spark] class ExternalSorter[K, V, C](
 
   private val spills = new ArrayBuffer[SpilledFile]
 
+  /**
+   * Number of files this sorter has spilled so far.
+   * Exposed for testing.
+   */
+  private[spark] def numSpills: Int = spills.size
+
   override def insertAll(records: Iterator[Product2[K, V]]): Unit = {
 // TODO: stop combining if we find that the reduction factor isn't high
 val shouldCombine = aggregator.isDefined

http://git-wip-us.apache.org/repos/asf/spark/blob/bc70043c/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 5c18558..e060c06 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -72,7 +72,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) 
extends SerializerInst