This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a0ae3511c [SPARK-43138][CORE] Fix ClassNotFoundException during 
migration
37a0ae3511c is described below

commit 37a0ae3511c9f153537d5928e9938f72763f5464
Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
AuthorDate: Thu May 11 08:25:45 2023 -0500

    [SPARK-43138][CORE] Fix ClassNotFoundException during migration
    
    ### What changes were proposed in this pull request?
    
    This PR fixes an unhandled ClassNotFoundException during RDD block 
decommissions migrations.
    ```
    2023-04-08 04:15:11,791 ERROR server.TransportRequestHandler: Error while 
invoking RpcHandler#receive() on RPC id 6425687122551756860
    java.lang.ClassNotFoundException: com.class.from.user.jar.ClassName
        at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:398)
        at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
        at 
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
        at 
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
        at 
java.base/java.io.ObjectInputStream.readClass(ObjectInputStream.java:1833)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1658)
        at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:123)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.deserializeMetadata(NettyBlockRpcServer.scala:180)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:119)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
    ```
    The exception occurs if RDD block contains user defined during the 
serialization of a `ClassTag` for the user defined class. The problem for 
serialization of the `ClassTag` a instance of 
`JavaSerializer`(https://github.com/apache/spark/blob/ca2ddf3c2079dda93053e64070ebda1610aa1968/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L62)
 is used, but it never configured to use a class loader including user defined 
classes. This PR solves the issue by inst [...]
    
    The reason is this does not occur during normal block replication and only 
during decommission is that there is a workaround/hack in 
`BlockManager.doPutIterator` that replaces the `ClassTag` with a 
`ClassTag[Any]` when replicating that block 
https://github.com/apache/spark/blob/ca2ddf3c2079dda93053e64070ebda1610aa1968/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1657-L1664
 But during RDD migration (and probably pro-active replication) it will use a 
different codepa [...]
    
    ### Why are the changes needed?
    The unhandled exception means that block replication does not work 
properly. Specifically cases where the block contains a user class and it not 
replicated at creation then the block will never successfully be migrated 
during decommission.
    
    ### Does this PR introduce _any_ user-facing change?
    It fixes the bug. But also since it changes from a fixed `JavaSerializer` 
to instead use the `SerializerManager` the `NettyBlockTransferService` might 
now instead use `KryoSerializer` or some other user configured serializer for 
the metadata.
    
    ### How was this patch tested?
    This modifies an existing spec to correctly check that replication happens 
for repl defined classes while removing the hack that erases the `ClassTag`.  
Additionally I tested this manually on a hadoop cluster to check that it also 
solves the decommission migration issue. If some can point me to some better 
way to add a spec using user defined classes I would also like to add a 
unittest for it.
    
    Closes #40808 from eejbyfeldt/SPARK-43138-class-not-found-exception.
    
    Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../src/main/scala/org/apache/spark/SparkEnv.scala |  4 ++--
 .../network/netty/NettyBlockTransferService.scala  |  5 +++--
 .../org/apache/spark/storage/BlockManager.scala    | 10 +--------
 .../netty/NettyBlockTransferSecuritySuite.scala    | 10 ++++++---
 .../netty/NettyBlockTransferServiceSuite.scala     |  6 ++++--
 .../storage/BlockManagerReplicationSuite.scala     |  5 +++--
 .../apache/spark/storage/BlockManagerSuite.scala   | 25 ++++++++++++----------
 .../org/apache/spark/repl/SingletonReplSuite.scala | 14 ++++++++----
 .../streaming/ReceivedBlockHandlerSuite.scala      |  3 ++-
 9 files changed, 46 insertions(+), 36 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 28ab9dc0742..272a0a6332b 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -363,8 +363,8 @@ object SparkEnv extends Logging {
       isDriver)
 
     val blockTransferService =
-      new NettyBlockTransferService(conf, securityManager, bindAddress, 
advertiseAddress,
-        blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
+      new NettyBlockTransferService(conf, securityManager, serializerManager, 
bindAddress,
+        advertiseAddress, blockManagerPort, numUsableCores, 
blockManagerMaster.driverEndpoint)
 
     // NB: blockManager is not valid until initialize() is called later.
     val blockManager = new BlockManager(
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index a418cb2bf44..d04d2eeef0b 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.network.shuffle.{BlockFetchingListener, BlockTransferLis
 import org.apache.spark.network.shuffle.protocol.{UploadBlock, 
UploadBlockStream}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.storage.BlockManagerMessages.IsExecutorAlive
 import org.apache.spark.util.Utils
@@ -51,6 +51,7 @@ import org.apache.spark.util.Utils
 private[spark] class NettyBlockTransferService(
     conf: SparkConf,
     securityManager: SecurityManager,
+    serializerManager: SerializerManager,
     bindAddress: String,
     override val hostName: String,
     _port: Int,
@@ -59,7 +60,7 @@ private[spark] class NettyBlockTransferService(
   extends BlockTransferService {
 
   // TODO: Don't use Java serialization, use a more cross-version compatible 
serialization format.
-  private val serializer = new JavaSerializer(conf)
+  private val serializer = 
serializerManager.getSerializer(scala.reflect.classTag[Any], false)
   private val authEnabled = securityManager.isAuthenticationEnabled()
 
   private[this] var transportContext: TransportContext = _
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a8f74ef179b..b4453b4d35e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1654,16 +1654,8 @@ private[spark] class BlockManager(
         if (level.replication > 1) {
           val remoteStartTimeNs = System.nanoTime()
           val bytesToReplicate = doGetLocalBytes(blockId, info)
-          // [SPARK-16550] Erase the typed classTag when using default 
serialization, since
-          // NettyBlockRpcServer crashes when deserializing repl-defined 
classes.
-          // TODO(ekl) remove this once the classloader issue on the remote 
end is fixed.
-          val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
-            scala.reflect.classTag[Any]
-          } else {
-            classTag
-          }
           try {
-            replicate(blockId, bytesToReplicate, level, remoteClassTag)
+            replicate(blockId, bytesToReplicate, level, classTag)
           } finally {
             bytesToReplicate.dispose()
           }
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 13bb811b840..85b05cd5f98 100644
--- 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -38,6 +38,7 @@ import org.apache.spark.internal.config.Network
 import org.apache.spark.network.{BlockDataManager, BlockTransferService}
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.shuffle.BlockFetchingListener
+import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 import org.apache.spark.storage.{BlockId, ShuffleBlockId}
 import org.apache.spark.util.ThreadUtils
 
@@ -126,13 +127,16 @@ class NettyBlockTransferSecuritySuite extends 
SparkFunSuite with MockitoSugar wi
     when(blockManager.getLocalBlockData(blockId)).thenReturn(blockBuffer)
 
     val securityManager0 = new SecurityManager(conf0)
-    val exec0 = new NettyBlockTransferService(conf0, securityManager0, 
"localhost", "localhost", 0,
+    val serializerManager0 = new SerializerManager(new JavaSerializer(conf0), 
conf0)
+    val exec0 = new NettyBlockTransferService(
+      conf0, securityManager0, serializerManager0, "localhost", "localhost", 0,
       1)
     exec0.init(blockManager)
 
     val securityManager1 = new SecurityManager(conf1)
-    val exec1 = new NettyBlockTransferService(conf1, securityManager1, 
"localhost", "localhost", 0,
-      1)
+    val serializerManager1 = new SerializerManager(new JavaSerializer(conf1), 
conf1)
+    val exec1 = new NettyBlockTransferService(
+      conf1, securityManager1, serializerManager1, "localhost", "localhost", 
0, 1)
     exec1.init(blockManager)
 
     val result = fetchBlock(exec0, exec1, "1", blockId) match {
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index 3a6bc47257f..62105f1d514 100644
--- 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.network.BlockDataManager
 import org.apache.spark.network.client.{TransportClient, 
TransportClientFactory}
 import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager}
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
+import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 
 class NettyBlockTransferServiceSuite
   extends SparkFunSuite
@@ -142,10 +143,11 @@ class NettyBlockTransferServiceSuite
       rpcEndpointRef: RpcEndpointRef = null): NettyBlockTransferService = {
     val conf = new SparkConf()
       .set("spark.app.id", s"test-${getClass.getName}")
+    val serializerManager = new SerializerManager(new JavaSerializer(conf), 
conf)
     val securityManager = new SecurityManager(conf)
     val blockDataManager = mock(classOf[BlockDataManager])
-    val service = new NettyBlockTransferService(conf, securityManager, 
"localhost", "localhost",
-      port, 1, rpcEndpointRef)
+    val service = new NettyBlockTransferService(
+      conf, securityManager, serializerManager, "localhost", "localhost", 
port, 1, rpcEndpointRef)
     service.init(blockDataManager)
     service
   }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 14e1ee5b09d..8729ae1edfb 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -75,9 +75,10 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
       memoryManager: Option[UnifiedMemoryManager] = None): BlockManager = {
     conf.set(TEST_MEMORY, maxMem)
     conf.set(MEMORY_OFFHEAP_SIZE, maxMem)
-    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
-    val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, 
numCores = 1))
     val serializerManager = new SerializerManager(serializer, conf)
+    val transfer = new NettyBlockTransferService(
+      conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
+    val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, 
numCores = 1))
     val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 
None)
     memManager.setMemoryStore(store.memoryStore)
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index cc1c01d80cb..29592434765 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -131,10 +131,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
       None
     }
     val bmSecurityMgr = new SecurityManager(bmConf, encryptionKey)
-    val transfer = transferService
-      .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", 
"localhost", 0, 1))
-    val memManager = UnifiedMemoryManager(bmConf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, bmConf)
+    val transfer = transferService.getOrElse(new NettyBlockTransferService(
+      conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1))
+    val memManager = UnifiedMemoryManager(bmConf, numCores = 1)
     val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
       val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 0)
       Some(new ExternalBlockStoreClient(transConf, bmSecurityMgr,
@@ -1308,9 +1308,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
   test("block store put failure") {
     // Use Java serializer so we can create an unserializable error.
     conf.set(TEST_MEMORY, 1200L)
-    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
-    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(new JavaSerializer(conf), 
conf)
+    val transfer = new NettyBlockTransferService(
+      conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
+    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, 
master,
       serializerManager, conf, memoryManager, mapOutputTracker,
       shuffleManager, transfer, securityMgr, None)
@@ -1357,8 +1358,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
         if (conf.get(IO_ENCRYPTION_ENABLED)) 
Some(CryptoStreamUtils.createKey(conf)) else None
       val securityMgr = new SecurityManager(conf, ioEncryptionKey)
       val serializerManager = new SerializerManager(serializer, conf, 
ioEncryptionKey)
-      val transfer =
-        new NettyBlockTransferService(conf, securityMgr, "localhost", 
"localhost", 0, 1)
+      val transfer = new NettyBlockTransferService(
+        conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
       val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
       val blockManager = new BlockManager(SparkContext.DRIVER_IDENTIFIER, 
rpcEnv, master,
         serializerManager, conf, memoryManager, mapOutputTracker,
@@ -2193,9 +2194,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
     case class User(id: Long, name: String)
 
     conf.set(TEST_MEMORY, 1200L)
-    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
-    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new 
SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf)
+    val transfer = new NettyBlockTransferService(
+      conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
+    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, 
master,
       serializerManager, conf, memoryManager, mapOutputTracker,
       shuffleManager, transfer, securityMgr, None)
@@ -2216,9 +2218,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
       = createKryoSerializerWithDiskCorruptedInputStream()
 
     conf.set(TEST_MEMORY, 1200L)
-    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
-    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new 
SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf)
+    val transfer = new NettyBlockTransferService(
+      conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
+    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, 
master,
       serializerManager, conf, memoryManager, mapOutputTracker,
       shuffleManager, transfer, securityMgr, None)
diff --git a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala 
b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
index 4795306692f..0e3bfcfa89d 100644
--- a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
@@ -344,13 +344,19 @@ class SingletonReplSuite extends SparkFunSuite {
         |}
         |import org.apache.spark.storage.StorageLevel._
         |case class Foo(i: Int)
-        |val ret = sc.parallelize((1 to 100).map(Foo), 
10).persist(MEMORY_AND_DISK_2)
-        |ret.count()
-        |val res = sc.getRDDStorageInfo.filter(_.id == 
ret.id).map(_.numCachedPartitions).sum
+        |val rdd1 = sc.parallelize((1 to 100).map(Foo), 
10).persist(MEMORY_ONLY)
+        |val rdd2 = sc.parallelize((1 to 100).map(Foo), 
10).persist(MEMORY_ONLY_2)
+        |rdd1.count()
+        |rdd2.count()
+        |val cached1 = sc.getRDDStorageInfo.filter(_.id == 
rdd1.id).map(_.numCachedPartitions).sum
+        |val size1 = sc.getRDDStorageInfo.filter(_.id == 
rdd1.id).map(_.memSize).sum
+        |val size2 = sc.getRDDStorageInfo.filter(_.id == 
rdd2.id).map(_.memSize).sum
+        |assert(size2 == size1 * 2, s"Blocks not replicated properly 
size1=$size1, size2=$size2")
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
-    assertContains("res: Int = 10", output)
+    assertContains("cached1: Int = 10", output)
+    assertDoesNotContain("AssertionError", output)
   }
 
   test("should clone and clean line object in ClosureCleaner") {
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index dcf82d5e2c2..1913552ceed 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -288,7 +288,8 @@ abstract class 
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
       conf: SparkConf,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem / 2, 1)
-    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
+    val transfer = new NettyBlockTransferService(
+      conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
     val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 
None)
     memManager.setMemoryStore(blockManager.memoryStore)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to