spark git commit: [SPARK-10403] Allow UnsafeRowSerializer to work with tungsten-sort ShuffleManager

2015-09-23 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 6c6cadb8f -> 64cc62cb5


[SPARK-10403] Allow UnsafeRowSerializer to work with tungsten-sort 
ShuffleManager

This patch attempts to fix an issue where Spark SQL's UnsafeRowSerializer was 
incompatible with the `tungsten-sort` ShuffleManager.

Author: Josh Rosen 

Closes #8873 from JoshRosen/SPARK-10403.

(cherry picked from commit a18208047f06a4244703c17023bb20cbe1f59d73)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64cc62cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64cc62cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64cc62cb

Branch: refs/heads/branch-1.5
Commit: 64cc62cb5f14dcc4a69073c48fdf3dd61c5df787
Parents: 6c6cadb
Author: Josh Rosen 
Authored: Wed Sep 23 11:31:01 2015 -0700
Committer: Michael Armbrust 
Committed: Wed Sep 23 11:31:14 2015 -0700

--
 .../sql/execution/UnsafeRowSerializer.scala | 22 +--
 .../execution/UnsafeRowSerializerSuite.scala| 23 ++--
 2 files changed, 27 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/64cc62cb/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index e060c06..7e98126 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -45,16 +45,9 @@ private[sql] class UnsafeRowSerializer(numFields: Int) 
extends Serializer with S
 }
 
 private class UnsafeRowSerializerInstance(numFields: Int) extends 
SerializerInstance {
-
-  /**
-   * Marks the end of a stream written with [[serializeStream()]].
-   */
-  private[this] val EOF: Int = -1
-
   /**
* Serializes a stream of UnsafeRows. Within the stream, each record 
consists of a record
* length (stored as a 4-byte integer, written high byte first), followed by 
the record's bytes.
-   * The end of the stream is denoted by a record with the special length 
`EOF` (-1).
*/
   override def serializeStream(out: OutputStream): SerializationStream = new 
SerializationStream {
 private[this] var writeBuffer: Array[Byte] = new Array[Byte](4096)
@@ -92,7 +85,6 @@ private class UnsafeRowSerializerInstance(numFields: Int) 
extends SerializerInst
 
 override def close(): Unit = {
   writeBuffer = null
-  dOut.writeInt(EOF)
   dOut.close()
 }
   }
@@ -104,12 +96,20 @@ private class UnsafeRowSerializerInstance(numFields: Int) 
extends SerializerInst
   private[this] var rowBuffer: Array[Byte] = new Array[Byte](1024)
   private[this] var row: UnsafeRow = new UnsafeRow()
   private[this] var rowTuple: (Int, UnsafeRow) = (0, row)
+  private[this] val EOF: Int = -1
 
   override def asKeyValueIterator: Iterator[(Int, UnsafeRow)] = {
 new Iterator[(Int, UnsafeRow)] {
-  private[this] var rowSize: Int = dIn.readInt()
-  if (rowSize == EOF) dIn.close()
 
+  private[this] def readSize(): Int = try {
+dIn.readInt()
+  } catch {
+case e: EOFException =>
+  dIn.close()
+  EOF
+  }
+
+  private[this] var rowSize: Int = readSize()
   override def hasNext: Boolean = rowSize != EOF
 
   override def next(): (Int, UnsafeRow) = {
@@ -118,7 +118,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) 
extends SerializerInst
 }
 ByteStreams.readFully(dIn, rowBuffer, 0, rowSize)
 row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, numFields, 
rowSize)
-rowSize = dIn.readInt() // read the next row's size
+rowSize = readSize()
 if (rowSize == EOF) { // We are returning the last row in this 
stream
   dIn.close()
   val _rowTuple = rowTuple

http://git-wip-us.apache.org/repos/asf/spark/blob/64cc62cb/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 0113d05..f7d48bc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 

spark git commit: [SPARK-10403] Allow UnsafeRowSerializer to work with tungsten-sort ShuffleManager

2015-09-23 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 27bfa9ab3 -> a18208047


[SPARK-10403] Allow UnsafeRowSerializer to work with tungsten-sort 
ShuffleManager

This patch attempts to fix an issue where Spark SQL's UnsafeRowSerializer was 
incompatible with the `tungsten-sort` ShuffleManager.

Author: Josh Rosen 

Closes #8873 from JoshRosen/SPARK-10403.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1820804
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1820804
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1820804

Branch: refs/heads/master
Commit: a18208047f06a4244703c17023bb20cbe1f59d73
Parents: 27bfa9a
Author: Josh Rosen 
Authored: Wed Sep 23 11:31:01 2015 -0700
Committer: Michael Armbrust 
Committed: Wed Sep 23 11:31:01 2015 -0700

--
 .../sql/execution/UnsafeRowSerializer.scala | 22 +--
 .../execution/UnsafeRowSerializerSuite.scala| 23 ++--
 2 files changed, 27 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a1820804/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index e060c06..7e98126 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -45,16 +45,9 @@ private[sql] class UnsafeRowSerializer(numFields: Int) 
extends Serializer with S
 }
 
 private class UnsafeRowSerializerInstance(numFields: Int) extends 
SerializerInstance {
-
-  /**
-   * Marks the end of a stream written with [[serializeStream()]].
-   */
-  private[this] val EOF: Int = -1
-
   /**
* Serializes a stream of UnsafeRows. Within the stream, each record 
consists of a record
* length (stored as a 4-byte integer, written high byte first), followed by 
the record's bytes.
-   * The end of the stream is denoted by a record with the special length 
`EOF` (-1).
*/
   override def serializeStream(out: OutputStream): SerializationStream = new 
SerializationStream {
 private[this] var writeBuffer: Array[Byte] = new Array[Byte](4096)
@@ -92,7 +85,6 @@ private class UnsafeRowSerializerInstance(numFields: Int) 
extends SerializerInst
 
 override def close(): Unit = {
   writeBuffer = null
-  dOut.writeInt(EOF)
   dOut.close()
 }
   }
@@ -104,12 +96,20 @@ private class UnsafeRowSerializerInstance(numFields: Int) 
extends SerializerInst
   private[this] var rowBuffer: Array[Byte] = new Array[Byte](1024)
   private[this] var row: UnsafeRow = new UnsafeRow()
   private[this] var rowTuple: (Int, UnsafeRow) = (0, row)
+  private[this] val EOF: Int = -1
 
   override def asKeyValueIterator: Iterator[(Int, UnsafeRow)] = {
 new Iterator[(Int, UnsafeRow)] {
-  private[this] var rowSize: Int = dIn.readInt()
-  if (rowSize == EOF) dIn.close()
 
+  private[this] def readSize(): Int = try {
+dIn.readInt()
+  } catch {
+case e: EOFException =>
+  dIn.close()
+  EOF
+  }
+
+  private[this] var rowSize: Int = readSize()
   override def hasNext: Boolean = rowSize != EOF
 
   override def next(): (Int, UnsafeRow) = {
@@ -118,7 +118,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) 
extends SerializerInst
 }
 ByteStreams.readFully(dIn, rowBuffer, 0, rowSize)
 row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, numFields, 
rowSize)
-rowSize = dIn.readInt() // read the next row's size
+rowSize = readSize()
 if (rowSize == EOF) { // We are returning the last row in this 
stream
   dIn.close()
   val _rowTuple = rowTuple

http://git-wip-us.apache.org/repos/asf/spark/blob/a1820804/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 0113d05..f7d48bc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.execution