Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5ecd3c23a -> 6e3fd2b98


[SPARK-18617][BACKPORT] Follow up PR to Close "kryo auto pick" feature for 
Spark Streaming

## What changes were proposed in this pull request?

This is a follow-up PR to backport #16052 to branch-2.0 with incremental update 
in #16091

## How was this patch tested?

new unit test

cc zsxwing rxin

Author: uncleGen <husty...@gmail.com>

Closes #16096 from uncleGen/branch-2.0-SPARK-18617.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e3fd2b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e3fd2b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e3fd2b9

Branch: refs/heads/branch-2.0
Commit: 6e3fd2b981e36f7f474781f22e606111d6ad13d5
Parents: 5ecd3c2
Author: uncleGen <husty...@gmail.com>
Authored: Thu Dec 1 10:33:51 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Dec 1 10:33:51 2016 -0800

----------------------------------------------------------------------
 .../spark/serializer/SerializerManager.scala    | 16 +++++++----
 .../spark/storage/memory/MemoryStore.scala      |  5 ++--
 .../storage/PartiallySerializedBlockSuite.scala |  6 ++--
 .../spark/streaming/StreamingContextSuite.scala | 29 ++++++++++++++++++++
 4 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 59bdc88..76b985c 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -72,8 +72,11 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
     primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
   }
 
-  def getSerializer(ct: ClassTag[_]): Serializer = {
-    if (canUseKryo(ct)) {
+  // SPARK-18617: As feature in SPARK-13990 can not be applied to Spark 
Streaming now. The worst
+  // result is streaming job based on `Receiver` mode can not run on Spark 2.x 
properly. It may be
+  // a rational choice to close `kryo auto pick` feature for streaming in the 
first step.
+  def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
+    if (autoPick && canUseKryo(ct)) {
       kryoSerializer
     } else {
       defaultSerializer
@@ -122,7 +125,8 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
       outputStream: OutputStream,
       values: Iterator[T]): Unit = {
     val byteStream = new BufferedOutputStream(outputStream)
-    val ser = getSerializer(implicitly[ClassTag[T]]).newInstance()
+    val autoPick = !blockId.isInstanceOf[StreamBlockId]
+    val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
     ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
   }
 
@@ -138,7 +142,8 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
       classTag: ClassTag[_]): ChunkedByteBuffer = {
     val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, 
ByteBuffer.allocate)
     val byteStream = new BufferedOutputStream(bbos)
-    val ser = getSerializer(classTag).newInstance()
+    val autoPick = !blockId.isInstanceOf[StreamBlockId]
+    val ser = getSerializer(classTag, autoPick).newInstance()
     ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
     bbos.toChunkedByteBuffer
   }
@@ -152,7 +157,8 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
       inputStream: InputStream)
       (classTag: ClassTag[T]): Iterator[T] = {
     val stream = new BufferedInputStream(inputStream)
-    getSerializer(classTag)
+    val autoPick = !blockId.isInstanceOf[StreamBlockId]
+    getSerializer(classTag, autoPick)
       .newInstance()
       .deserializeStream(wrapForCompression(blockId, stream))
       .asIterator.asInstanceOf[Iterator[T]]

http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 9b87c42..68dff85 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
-import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
+import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, 
StreamBlockId}
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.{SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
@@ -332,7 +332,8 @@ private[spark] class MemoryStore(
     val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, 
allocator)
     redirectableStream.setOutputStream(bbos)
     val serializationStream: SerializationStream = {
-      val ser = serializerManager.getSerializer(classTag).newInstance()
+      val autoPick = !blockId.isInstanceOf[StreamBlockId]
+      val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
       ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
index ec4f263..3050f9a 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
@@ -67,7 +67,8 @@ class PartiallySerializedBlockSuite
       spy
     }
 
-    val serializer = 
serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
+    val serializer = serializerManager
+      .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
     val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream)
     redirectableOutputStream.setOutputStream(bbos)
     val serializationStream = 
Mockito.spy(serializer.serializeStream(redirectableOutputStream))
@@ -182,7 +183,8 @@ class PartiallySerializedBlockSuite
       Mockito.verifyNoMoreInteractions(memoryStore)
       Mockito.verify(partiallySerializedBlock.getUnrolledChunkedByteBuffer, 
atLeastOnce).dispose()
 
-      val serializer = 
serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
+      val serializer = serializerManager
+        .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
       val deserialized =
         serializer.deserializeStream(new 
ByteBufferInputStream(bbos.toByteBuffer)).asIterator.toSeq
       assert(deserialized === items)

http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f1482e5..35eeb9d 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming
 
 import java.io.{File, NotSerializableException}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.ArrayBuffer
@@ -806,6 +807,34 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
     ssc.stop()
   }
 
+  test("SPARK-18560 Receiver data should be deserialized properly.") {
+    // Start a two nodes cluster, so receiver will use one node, and Spark 
jobs will use the
+    // other one. Then Spark jobs need to fetch remote blocks and it will 
trigger SPARK-18560.
+    val conf = new 
SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
+    ssc = new StreamingContext(conf, Milliseconds(100))
+    val input = ssc.receiverStream(new TestReceiver)
+    val latch = new CountDownLatch(1)
+    input.count().foreachRDD { rdd =>
+      // Make sure we can read from BlockRDD
+      if (rdd.collect().headOption.getOrElse(0L) > 0) {
+        // Stop StreamingContext to unblock "awaitTerminationOrTimeout"
+        new Thread() {
+          setDaemon(true)
+          override def run(): Unit = {
+            ssc.stop(stopSparkContext = true, stopGracefully = false)
+            latch.countDown()
+          }
+        }.start()
+      }
+    }
+    ssc.start()
+    ssc.awaitTerminationOrTimeout(60000)
+    // Wait until `ssc.top` returns. Otherwise, we may finish this test too 
fast and leak an active
+    // SparkContext. Note: the stop codes in `after` will just do nothing if 
`ssc.stop` in this test
+    // is running.
+    assert(latch.await(60, TimeUnit.SECONDS))
+  }
+
   def addInputStream(s: StreamingContext): DStream[Int] = {
     val input = (1 to 100).map(i => 1 to i)
     val inputStream = new TestInputStream(s, input, 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to