Repository: spark Updated Branches: refs/heads/branch-2.0 5ecd3c23a -> 6e3fd2b98
[SPARK-18617][BACKPORT] Follow up PR to Close "kryo auto pick" feature for Spark Streaming ## What changes were proposed in this pull request? This is a follow-up PR to backport #16052 to branch-2.0 with incremental update in #16091 ## How was this patch tested? new unit test cc zsxwing rxin Author: uncleGen <husty...@gmail.com> Closes #16096 from uncleGen/branch-2.0-SPARK-18617. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e3fd2b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e3fd2b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e3fd2b9 Branch: refs/heads/branch-2.0 Commit: 6e3fd2b981e36f7f474781f22e606111d6ad13d5 Parents: 5ecd3c2 Author: uncleGen <husty...@gmail.com> Authored: Thu Dec 1 10:33:51 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Thu Dec 1 10:33:51 2016 -0800 ---------------------------------------------------------------------- .../spark/serializer/SerializerManager.scala | 16 +++++++---- .../spark/storage/memory/MemoryStore.scala | 5 ++-- .../storage/PartiallySerializedBlockSuite.scala | 6 ++-- .../spark/streaming/StreamingContextSuite.scala | 29 ++++++++++++++++++++ 4 files changed, 47 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 59bdc88..76b985c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -72,8 +72,11 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag } - def getSerializer(ct: ClassTag[_]): Serializer = { - if (canUseKryo(ct)) { + // SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The worst + // result is streaming job based on `Receiver` mode can not run on Spark 2.x properly. It may be + // a rational choice to close `kryo auto pick` feature for streaming in the first step. + def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = { + if (autoPick && canUseKryo(ct)) { kryoSerializer } else { defaultSerializer @@ -122,7 +125,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar outputStream: OutputStream, values: Iterator[T]): Unit = { val byteStream = new BufferedOutputStream(outputStream) - val ser = getSerializer(implicitly[ClassTag[T]]).newInstance() + val autoPick = !blockId.isInstanceOf[StreamBlockId] + val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } @@ -138,7 +142,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar classTag: ClassTag[_]): ChunkedByteBuffer = { val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate) val byteStream = new BufferedOutputStream(bbos) - val ser = getSerializer(classTag).newInstance() + val autoPick = !blockId.isInstanceOf[StreamBlockId] + val ser = getSerializer(classTag, autoPick).newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() bbos.toChunkedByteBuffer } @@ -152,7 +157,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar inputStream: InputStream) (classTag: ClassTag[T]): Iterator[T] = { val stream = new BufferedInputStream(inputStream) - getSerializer(classTag) + val autoPick = !blockId.isInstanceOf[StreamBlockId] + getSerializer(classTag, autoPick) .newInstance() .deserializeStream(wrapForCompression(blockId, stream)) .asIterator.asInstanceOf[Iterator[T]] http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 9b87c42..68dff85 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} -import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel} +import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId} import org.apache.spark.unsafe.Platform import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector @@ -332,7 +332,8 @@ private[spark] class MemoryStore( val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator) redirectableStream.setOutputStream(bbos) val serializationStream: SerializationStream = { - val ser = serializerManager.getSerializer(classTag).newInstance() + val autoPick = !blockId.isInstanceOf[StreamBlockId] + val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala index ec4f263..3050f9a 100644 --- a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala @@ -67,7 +67,8 @@ class PartiallySerializedBlockSuite spy } - val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance() + val serializer = serializerManager + .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance() val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream) redirectableOutputStream.setOutputStream(bbos) val serializationStream = Mockito.spy(serializer.serializeStream(redirectableOutputStream)) @@ -182,7 +183,8 @@ class PartiallySerializedBlockSuite Mockito.verifyNoMoreInteractions(memoryStore) Mockito.verify(partiallySerializedBlock.getUnrolledChunkedByteBuffer, atLeastOnce).dispose() - val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance() + val serializer = serializerManager + .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance() val deserialized = serializer.deserializeStream(new ByteBufferInputStream(bbos.toByteBuffer)).asIterator.toSeq assert(deserialized === items) http://git-wip-us.apache.org/repos/asf/spark/blob/6e3fd2b9/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f1482e5..35eeb9d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -806,6 +807,34 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc.stop() } + test("SPARK-18560 Receiver data should be deserialized properly.") { + // Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the + // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560. + val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName) + ssc = new StreamingContext(conf, Milliseconds(100)) + val input = ssc.receiverStream(new TestReceiver) + val latch = new CountDownLatch(1) + input.count().foreachRDD { rdd => + // Make sure we can read from BlockRDD + if (rdd.collect().headOption.getOrElse(0L) > 0) { + // Stop StreamingContext to unblock "awaitTerminationOrTimeout" + new Thread() { + setDaemon(true) + override def run(): Unit = { + ssc.stop(stopSparkContext = true, stopGracefully = false) + latch.countDown() + } + }.start() + } + } + ssc.start() + ssc.awaitTerminationOrTimeout(60000) + // Wait until `ssc.top` returns. Otherwise, we may finish this test too fast and leak an active + // SparkContext. Note: the stop codes in `after` will just do nothing if `ssc.stop` in this test + // is running. + assert(latch.await(60, TimeUnit.SECONDS)) + } + def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => 1 to i) val inputStream = new TestInputStream(s, input, 1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org