spark git commit: [SPARK-17491] Close serialization stream to fix wrong answer bug in putIteratorAsBytes()

2016-09-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 86c2d393a -> 8faa5217b


[SPARK-17491] Close serialization stream to fix wrong answer bug in 
putIteratorAsBytes()

## What changes were proposed in this pull request?

`MemoryStore.putIteratorAsBytes()` may silently lose values when used with 
`KryoSerializer` because it does not properly close the serialization stream 
before attempting to deserialize the already-serialized values, which may cause 
values buffered in Kryo's internal buffers to not be read.

This is the root cause behind a user-reported "wrong answer" bug in PySpark 
caching reported by bennoleslie on the Spark user mailing list in a thread 
titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's 
automatic use of KryoSerializer for "safe" types (such as byte arrays, 
primitives, etc.) this misuse of serializers manifested itself as silent data 
corruption rather than a StreamCorrupted error (which you might get from 
JavaSerializer).

The minimal fix, implemented here, is to close the serialization stream before 
attempting to deserialize written values. In addition, this patch adds several 
additional assertions / precondition checks to prevent misuse of 
`PartiallySerializedBlock` and `ChunkedByteBufferOutputStream`.

## How was this patch tested?

The original bug was masked by an invalid assert in the memory store test 
cases: the old assert compared two results record-by-record with `zip` but 
didn't first check that the lengths of the two collections were equal, causing 
missing records to go unnoticed. The updated test case reproduced this bug.

In addition, I added a new `PartiallySerializedBlockSuite` to unit test that 
component.

Author: Josh Rosen 

Closes #15043 from JoshRosen/partially-serialized-block-values-iterator-bugfix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8faa5217
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8faa5217
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8faa5217

Branch: refs/heads/master
Commit: 8faa5217b44e8d52eab7eb2d53d0652abaaf43cd
Parents: 86c2d39
Author: Josh Rosen 
Authored: Sat Sep 17 11:46:15 2016 -0700
Committer: Josh Rosen 
Committed: Sat Sep 17 11:46:15 2016 -0700

--
 .../scala/org/apache/spark/scheduler/Task.scala |   1 +
 .../spark/storage/memory/MemoryStore.scala  |  89 ++--
 .../spark/util/ByteBufferOutputStream.scala |  27 ++-
 .../util/io/ChunkedByteBufferOutputStream.scala |  12 +-
 .../apache/spark/storage/MemoryStoreSuite.scala |  34 ++-
 .../storage/PartiallySerializedBlockSuite.scala | 215 +++
 .../PartiallyUnrolledIteratorSuite.scala|   2 +-
 .../io/ChunkedByteBufferOutputStreamSuite.scala |   8 +
 8 files changed, 344 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8faa5217/core/src/main/scala/org/apache/spark/scheduler/Task.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 35c4daf..1ed36bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -230,6 +230,7 @@ private[spark] object Task {
 dataOut.flush()
 val taskBytes = serializer.serialize(task)
 Utils.writeByteBuffer(taskBytes, out)
+out.close()
 out.toByteBuffer
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8faa5217/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index ec1b0f7..205d469 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -33,7 +33,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
 import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
 import org.apache.spark.unsafe.Platform
-import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
+import org.apache.spark.util.{SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
 import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
@@ -277,6 +277,7 @@ private[spark] class MemoryStore(
   "released too much unroll memory")
 Left(new PartiallyUnrolledIterator(
   this,
+  MemoryMode.ON_HEAP,
   unrollMemoryUsedByThisBlock,
   unrolled 

spark git commit: [SPARK-17491] Close serialization stream to fix wrong answer bug in putIteratorAsBytes()

2016-09-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a3bba372a -> bec077069


[SPARK-17491] Close serialization stream to fix wrong answer bug in 
putIteratorAsBytes()

## What changes were proposed in this pull request?

`MemoryStore.putIteratorAsBytes()` may silently lose values when used with 
`KryoSerializer` because it does not properly close the serialization stream 
before attempting to deserialize the already-serialized values, which may cause 
values buffered in Kryo's internal buffers to not be read.

This is the root cause behind a user-reported "wrong answer" bug in PySpark 
caching reported by bennoleslie on the Spark user mailing list in a thread 
titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's 
automatic use of KryoSerializer for "safe" types (such as byte arrays, 
primitives, etc.) this misuse of serializers manifested itself as silent data 
corruption rather than a StreamCorrupted error (which you might get from 
JavaSerializer).

The minimal fix, implemented here, is to close the serialization stream before 
attempting to deserialize written values. In addition, this patch adds several 
additional assertions / precondition checks to prevent misuse of 
`PartiallySerializedBlock` and `ChunkedByteBufferOutputStream`.

## How was this patch tested?

The original bug was masked by an invalid assert in the memory store test 
cases: the old assert compared two results record-by-record with `zip` but 
didn't first check that the lengths of the two collections were equal, causing 
missing records to go unnoticed. The updated test case reproduced this bug.

In addition, I added a new `PartiallySerializedBlockSuite` to unit test that 
component.

Author: Josh Rosen 

Closes #15043 from JoshRosen/partially-serialized-block-values-iterator-bugfix.

(cherry picked from commit 8faa5217b44e8d52eab7eb2d53d0652abaaf43cd)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bec07706
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bec07706
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bec07706

Branch: refs/heads/branch-2.0
Commit: bec077069af0b3bc22092a0552baf855dfb344ad
Parents: a3bba37
Author: Josh Rosen 
Authored: Sat Sep 17 11:46:15 2016 -0700
Committer: Josh Rosen 
Committed: Sat Sep 17 11:46:39 2016 -0700

--
 .../scala/org/apache/spark/scheduler/Task.scala |   1 +
 .../spark/storage/memory/MemoryStore.scala  |  89 ++--
 .../spark/util/ByteBufferOutputStream.scala |  27 ++-
 .../util/io/ChunkedByteBufferOutputStream.scala |  12 +-
 .../apache/spark/storage/MemoryStoreSuite.scala |  34 ++-
 .../storage/PartiallySerializedBlockSuite.scala | 215 +++
 .../PartiallyUnrolledIteratorSuite.scala|   2 +-
 .../io/ChunkedByteBufferOutputStreamSuite.scala |   8 +
 8 files changed, 344 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bec07706/core/src/main/scala/org/apache/spark/scheduler/Task.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 35c4daf..1ed36bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -230,6 +230,7 @@ private[spark] object Task {
 dataOut.flush()
 val taskBytes = serializer.serialize(task)
 Utils.writeByteBuffer(taskBytes, out)
+out.close()
 out.toByteBuffer
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bec07706/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 1230128..161434c 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -33,7 +33,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
 import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
 import org.apache.spark.unsafe.Platform
-import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
+import org.apache.spark.util.{SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
 import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
@@ -275,6 +275,7 @@ private[spark] class MemoryStore(
   "released too much unroll memory")
 Left(new PartiallyUnrolledIterator(