spark git commit: [SPARK-17491] Close serialization stream to fix wrong answer bug in putIteratorAsBytes()
Repository: spark Updated Branches: refs/heads/master 86c2d393a -> 8faa5217b [SPARK-17491] Close serialization stream to fix wrong answer bug in putIteratorAsBytes() ## What changes were proposed in this pull request? `MemoryStore.putIteratorAsBytes()` may silently lose values when used with `KryoSerializer` because it does not properly close the serialization stream before attempting to deserialize the already-serialized values, which may cause values buffered in Kryo's internal buffers to not be read. This is the root cause behind a user-reported "wrong answer" bug in PySpark caching reported by bennoleslie on the Spark user mailing list in a thread titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's automatic use of KryoSerializer for "safe" types (such as byte arrays, primitives, etc.) this misuse of serializers manifested itself as silent data corruption rather than a StreamCorrupted error (which you might get from JavaSerializer). The minimal fix, implemented here, is to close the serialization stream before attempting to deserialize written values. In addition, this patch adds several additional assertions / precondition checks to prevent misuse of `PartiallySerializedBlock` and `ChunkedByteBufferOutputStream`. ## How was this patch tested? The original bug was masked by an invalid assert in the memory store test cases: the old assert compared two results record-by-record with `zip` but didn't first check that the lengths of the two collections were equal, causing missing records to go unnoticed. The updated test case reproduced this bug. In addition, I added a new `PartiallySerializedBlockSuite` to unit test that component. Author: Josh Rosen Closes #15043 from JoshRosen/partially-serialized-block-values-iterator-bugfix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8faa5217 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8faa5217 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8faa5217 Branch: refs/heads/master Commit: 8faa5217b44e8d52eab7eb2d53d0652abaaf43cd Parents: 86c2d39 Author: Josh Rosen Authored: Sat Sep 17 11:46:15 2016 -0700 Committer: Josh Rosen Committed: Sat Sep 17 11:46:15 2016 -0700 -- .../scala/org/apache/spark/scheduler/Task.scala | 1 + .../spark/storage/memory/MemoryStore.scala | 89 ++-- .../spark/util/ByteBufferOutputStream.scala | 27 ++- .../util/io/ChunkedByteBufferOutputStream.scala | 12 +- .../apache/spark/storage/MemoryStoreSuite.scala | 34 ++- .../storage/PartiallySerializedBlockSuite.scala | 215 +++ .../PartiallyUnrolledIteratorSuite.scala| 2 +- .../io/ChunkedByteBufferOutputStreamSuite.scala | 8 + 8 files changed, 344 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8faa5217/core/src/main/scala/org/apache/spark/scheduler/Task.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 35c4daf..1ed36bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -230,6 +230,7 @@ private[spark] object Task { dataOut.flush() val taskBytes = serializer.serialize(task) Utils.writeByteBuffer(taskBytes, out) +out.close() out.toByteBuffer } http://git-wip-us.apache.org/repos/asf/spark/blob/8faa5217/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index ec1b0f7..205d469 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -33,7 +33,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel} import org.apache.spark.unsafe.Platform -import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} +import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} @@ -277,6 +277,7 @@ private[spark] class MemoryStore( "released too much unroll memory") Left(new PartiallyUnrolledIterator( this, + MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled
spark git commit: [SPARK-17491] Close serialization stream to fix wrong answer bug in putIteratorAsBytes()
Repository: spark Updated Branches: refs/heads/branch-2.0 a3bba372a -> bec077069 [SPARK-17491] Close serialization stream to fix wrong answer bug in putIteratorAsBytes() ## What changes were proposed in this pull request? `MemoryStore.putIteratorAsBytes()` may silently lose values when used with `KryoSerializer` because it does not properly close the serialization stream before attempting to deserialize the already-serialized values, which may cause values buffered in Kryo's internal buffers to not be read. This is the root cause behind a user-reported "wrong answer" bug in PySpark caching reported by bennoleslie on the Spark user mailing list in a thread titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's automatic use of KryoSerializer for "safe" types (such as byte arrays, primitives, etc.) this misuse of serializers manifested itself as silent data corruption rather than a StreamCorrupted error (which you might get from JavaSerializer). The minimal fix, implemented here, is to close the serialization stream before attempting to deserialize written values. In addition, this patch adds several additional assertions / precondition checks to prevent misuse of `PartiallySerializedBlock` and `ChunkedByteBufferOutputStream`. ## How was this patch tested? The original bug was masked by an invalid assert in the memory store test cases: the old assert compared two results record-by-record with `zip` but didn't first check that the lengths of the two collections were equal, causing missing records to go unnoticed. The updated test case reproduced this bug. In addition, I added a new `PartiallySerializedBlockSuite` to unit test that component. Author: Josh Rosen Closes #15043 from JoshRosen/partially-serialized-block-values-iterator-bugfix. (cherry picked from commit 8faa5217b44e8d52eab7eb2d53d0652abaaf43cd) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bec07706 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bec07706 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bec07706 Branch: refs/heads/branch-2.0 Commit: bec077069af0b3bc22092a0552baf855dfb344ad Parents: a3bba37 Author: Josh Rosen Authored: Sat Sep 17 11:46:15 2016 -0700 Committer: Josh Rosen Committed: Sat Sep 17 11:46:39 2016 -0700 -- .../scala/org/apache/spark/scheduler/Task.scala | 1 + .../spark/storage/memory/MemoryStore.scala | 89 ++-- .../spark/util/ByteBufferOutputStream.scala | 27 ++- .../util/io/ChunkedByteBufferOutputStream.scala | 12 +- .../apache/spark/storage/MemoryStoreSuite.scala | 34 ++- .../storage/PartiallySerializedBlockSuite.scala | 215 +++ .../PartiallyUnrolledIteratorSuite.scala| 2 +- .../io/ChunkedByteBufferOutputStreamSuite.scala | 8 + 8 files changed, 344 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bec07706/core/src/main/scala/org/apache/spark/scheduler/Task.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 35c4daf..1ed36bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -230,6 +230,7 @@ private[spark] object Task { dataOut.flush() val taskBytes = serializer.serialize(task) Utils.writeByteBuffer(taskBytes, out) +out.close() out.toByteBuffer } http://git-wip-us.apache.org/repos/asf/spark/blob/bec07706/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 1230128..161434c 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -33,7 +33,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel} import org.apache.spark.unsafe.Platform -import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} +import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} @@ -275,6 +275,7 @@ private[spark] class MemoryStore( "released too much unroll memory") Left(new PartiallyUnrolledIterator(