spark git commit: [SPARK-10403] Allow UnsafeRowSerializer to work with tungsten-sort ShuffleManager
Repository: spark Updated Branches: refs/heads/branch-1.5 6c6cadb8f -> 64cc62cb5 [SPARK-10403] Allow UnsafeRowSerializer to work with tungsten-sort ShuffleManager This patch attempts to fix an issue where Spark SQL's UnsafeRowSerializer was incompatible with the `tungsten-sort` ShuffleManager. Author: Josh RosenCloses #8873 from JoshRosen/SPARK-10403. (cherry picked from commit a18208047f06a4244703c17023bb20cbe1f59d73) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64cc62cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64cc62cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64cc62cb Branch: refs/heads/branch-1.5 Commit: 64cc62cb5f14dcc4a69073c48fdf3dd61c5df787 Parents: 6c6cadb Author: Josh Rosen Authored: Wed Sep 23 11:31:01 2015 -0700 Committer: Michael Armbrust Committed: Wed Sep 23 11:31:14 2015 -0700 -- .../sql/execution/UnsafeRowSerializer.scala | 22 +-- .../execution/UnsafeRowSerializerSuite.scala| 23 ++-- 2 files changed, 27 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/64cc62cb/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index e060c06..7e98126 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -45,16 +45,9 @@ private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with S } private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInstance { - - /** - * Marks the end of a stream written with [[serializeStream()]]. - */ - private[this] val EOF: Int = -1 - /** * Serializes a stream of UnsafeRows. Within the stream, each record consists of a record * length (stored as a 4-byte integer, written high byte first), followed by the record's bytes. - * The end of the stream is denoted by a record with the special length `EOF` (-1). */ override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { private[this] var writeBuffer: Array[Byte] = new Array[Byte](4096) @@ -92,7 +85,6 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst override def close(): Unit = { writeBuffer = null - dOut.writeInt(EOF) dOut.close() } } @@ -104,12 +96,20 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst private[this] var rowBuffer: Array[Byte] = new Array[Byte](1024) private[this] var row: UnsafeRow = new UnsafeRow() private[this] var rowTuple: (Int, UnsafeRow) = (0, row) + private[this] val EOF: Int = -1 override def asKeyValueIterator: Iterator[(Int, UnsafeRow)] = { new Iterator[(Int, UnsafeRow)] { - private[this] var rowSize: Int = dIn.readInt() - if (rowSize == EOF) dIn.close() + private[this] def readSize(): Int = try { +dIn.readInt() + } catch { +case e: EOFException => + dIn.close() + EOF + } + + private[this] var rowSize: Int = readSize() override def hasNext: Boolean = rowSize != EOF override def next(): (Int, UnsafeRow) = { @@ -118,7 +118,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst } ByteStreams.readFully(dIn, rowBuffer, 0, rowSize) row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, numFields, rowSize) -rowSize = dIn.readInt() // read the next row's size +rowSize = readSize() if (rowSize == EOF) { // We are returning the last row in this stream dIn.close() val _rowTuple = rowTuple http://git-wip-us.apache.org/repos/asf/spark/blob/64cc62cb/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 0113d05..f7d48bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++
spark git commit: [SPARK-10403] Allow UnsafeRowSerializer to work with tungsten-sort ShuffleManager
Repository: spark Updated Branches: refs/heads/master 27bfa9ab3 -> a18208047 [SPARK-10403] Allow UnsafeRowSerializer to work with tungsten-sort ShuffleManager This patch attempts to fix an issue where Spark SQL's UnsafeRowSerializer was incompatible with the `tungsten-sort` ShuffleManager. Author: Josh RosenCloses #8873 from JoshRosen/SPARK-10403. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1820804 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1820804 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1820804 Branch: refs/heads/master Commit: a18208047f06a4244703c17023bb20cbe1f59d73 Parents: 27bfa9a Author: Josh Rosen Authored: Wed Sep 23 11:31:01 2015 -0700 Committer: Michael Armbrust Committed: Wed Sep 23 11:31:01 2015 -0700 -- .../sql/execution/UnsafeRowSerializer.scala | 22 +-- .../execution/UnsafeRowSerializerSuite.scala| 23 ++-- 2 files changed, 27 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a1820804/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index e060c06..7e98126 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -45,16 +45,9 @@ private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with S } private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInstance { - - /** - * Marks the end of a stream written with [[serializeStream()]]. - */ - private[this] val EOF: Int = -1 - /** * Serializes a stream of UnsafeRows. Within the stream, each record consists of a record * length (stored as a 4-byte integer, written high byte first), followed by the record's bytes. - * The end of the stream is denoted by a record with the special length `EOF` (-1). */ override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { private[this] var writeBuffer: Array[Byte] = new Array[Byte](4096) @@ -92,7 +85,6 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst override def close(): Unit = { writeBuffer = null - dOut.writeInt(EOF) dOut.close() } } @@ -104,12 +96,20 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst private[this] var rowBuffer: Array[Byte] = new Array[Byte](1024) private[this] var row: UnsafeRow = new UnsafeRow() private[this] var rowTuple: (Int, UnsafeRow) = (0, row) + private[this] val EOF: Int = -1 override def asKeyValueIterator: Iterator[(Int, UnsafeRow)] = { new Iterator[(Int, UnsafeRow)] { - private[this] var rowSize: Int = dIn.readInt() - if (rowSize == EOF) dIn.close() + private[this] def readSize(): Int = try { +dIn.readInt() + } catch { +case e: EOFException => + dIn.close() + EOF + } + + private[this] var rowSize: Int = readSize() override def hasNext: Boolean = rowSize != EOF override def next(): (Int, UnsafeRow) = { @@ -118,7 +118,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst } ByteStreams.readFully(dIn, rowBuffer, 0, rowSize) row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, numFields, rowSize) -rowSize = dIn.readInt() // read the next row's size +rowSize = readSize() if (rowSize == EOF) { // We are returning the last row in this stream dIn.close() val _rowTuple = rowTuple http://git-wip-us.apache.org/repos/asf/spark/blob/a1820804/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 0113d05..f7d48bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -17,9 +17,10 @@ package org.apache.spark.sql.execution