Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2940#discussion_r19507777
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
    @@ -0,0 +1,221 @@
    +package org.apache.spark.streaming
    +
    +import java.io.File
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
    +
    +import akka.actor.{ActorSystem, Props}
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +
    +import org.apache.spark._
    +import org.apache.spark.network.nio.NioBlockTransferService
    +import org.apache.spark.scheduler.LiveListenerBus
    +import org.apache.spark.serializer.KryoSerializer
    +import org.apache.spark.shuffle.hash.HashShuffleManager
    +import org.apache.spark.storage._
    +import org.apache.spark.streaming.util._
    +import org.apache.spark.streaming.receiver._
    +import org.apache.spark.util.AkkaUtils
    +import WriteAheadLogBasedBlockHandler._
    +import WriteAheadLogSuite._
    +
    +class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with 
Matchers with Logging {
    +
    +  val conf = new 
SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
    +  val hadoopConf = new Configuration()
    +  val storageLevel = StorageLevel.MEMORY_ONLY_SER
    +  val streamId = 1
    +  val securityMgr = new SecurityManager(conf)
    +  val mapOutputTracker = new MapOutputTrackerMaster(conf)
    +  val shuffleManager = new HashShuffleManager(conf)
    +  val serializer = new KryoSerializer(conf)
    +  val manualClock = new ManualClock
    +  val blockManagerSize = 10000000
    +
    +  var actorSystem: ActorSystem = null
    +  var blockManagerMaster: BlockManagerMaster = null
    +  var blockManager: BlockManager = null
    +  var receivedBlockHandler: ReceivedBlockHandler = null
    +  var tempDirectory: File = null
    +
    +  before {
    +    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
    +      "test", "localhost", 0, conf = conf, securityManager = securityMgr)
    +    this.actorSystem = actorSystem
    +    conf.set("spark.driver.port", boundPort.toString)
    +
    +    blockManagerMaster = new BlockManagerMaster(
    +      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, 
new LiveListenerBus))),
    +      conf, true)
    +
    +    blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, 
serializer,
    +      blockManagerSize, conf, mapOutputTracker, shuffleManager,
    +      new NioBlockTransferService(conf, securityMgr))
    +
    +    tempDirectory = Files.createTempDir()
    +    manualClock.setTime(0)
    +  }
    +
    +  after {
    +    if (receivedBlockHandler != null) {
    +      if 
(receivedBlockHandler.isInstanceOf[WriteAheadLogBasedBlockHandler]) {
    +        
receivedBlockHandler.asInstanceOf[WriteAheadLogBasedBlockHandler].stop()
    +      }
    +    }
    +    if (blockManager != null) {
    +      blockManager.stop()
    +      blockManager = null
    +    }
    +    if (blockManagerMaster != null) {
    +      blockManagerMaster.stop()
    +      blockManagerMaster = null
    +    }
    +    actorSystem.shutdown()
    +    actorSystem.awaitTermination()
    +    actorSystem = null
    +
    +    if (tempDirectory != null && tempDirectory.exists()) {
    +      FileUtils.deleteDirectory(tempDirectory)
    +      tempDirectory = null
    +    }
    +  }
    +
    +  test("BlockManagerBasedBlockHandler - store blocks") {
    +    createBlockManagerBasedBlockHandler()
    --- End diff --
    
    That is most probably easier to read. Let me try that out. Thanks for the 
idea!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to