Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1751#discussion_r15784701
  
    --- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
 ---
    @@ -17,28 +17,210 @@
     
     package org.apache.spark.streaming.kafka
     
    -import kafka.serializer.StringDecoder
    +import java.io.File
    +import java.net.InetSocketAddress
    +import java.util.{Properties, Random}
    +
    +import scala.collection.mutable
    +
    +import kafka.admin.CreateTopicCommand
    +import kafka.common.TopicAndPartition
    +import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
    +import kafka.utils.ZKStringSerializer
    +import kafka.serializer.{StringDecoder, StringEncoder}
    +import kafka.server.{KafkaConfig, KafkaServer}
    +
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.zookeeper.server.ZooKeeperServer
    +import org.apache.zookeeper.server.NIOServerCnxnFactory
    +
     import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
     import org.apache.spark.storage.StorageLevel
    -import org.apache.spark.streaming.dstream.ReceiverInputDStream
     
     class KafkaStreamSuite extends TestSuiteBase {
    +  import KafkaStreamSuite._
    +
    +  val zkConnect = "localhost:2181"
    +  val zkConnectionTimeout = 6000
    +  val zkSessionTimeout = 6000
    +
    +  val brokerPort = 9092
    +  val brokerProps = getBrokerConfig(brokerPort)
    +  val brokerConf = new KafkaConfig(brokerProps)
    +
    +  protected var zookeeper: EmbeddedZookeeper = _
    +  protected var zkClient: ZkClient = _
    +  protected var server: KafkaServer = _
    +  protected var producer: Producer[String, String] = _
    +
    +  override def useManualClock = false
    +
    +  override def beforeFunction() {
    +    // Zookeeper server startup
    +    zookeeper = new EmbeddedZookeeper(zkConnect)
    +    logInfo("==================== 0 ====================")
    +    zkClient = new ZkClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, ZKStringSerializer)
    +    logInfo("==================== 1 ====================")
    +
    +    // Kafka broker startup
    +    server = new KafkaServer(brokerConf)
    +    logInfo("==================== 2 ====================")
    +    server.startup()
    +    logInfo("==================== 3 ====================")
    +    Thread.sleep(2000)
    +    logInfo("==================== 4 ====================")
    +    super.beforeFunction()
    +  }
     
    -  test("kafka input stream") {
    +  override def afterFunction() {
    +    producer.close()
    +    server.shutdown()
    +    brokerConf.logDirs.foreach { f => deleteDir(new File(f)) }
    +
    +    zkClient.close()
    +    zookeeper.shutdown()
    +
    +    super.afterFunction()
    +  }
    +
    +  test("Kafka input stream") {
         val ssc = new StreamingContext(master, framework, batchDuration)
    -    val topics = Map("my-topic" -> 1)
    -
    -    // tests the API, does not actually test data receiving
    -    val test1: ReceiverInputDStream[(String, String)] =
    -      KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
    -    val test2: ReceiverInputDStream[(String, String)] =
    -      KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, 
StorageLevel.MEMORY_AND_DISK_SER_2)
    -    val kafkaParams = 
Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
    -    val test3: ReceiverInputDStream[(String, String)] =
    -      KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](
    -      ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
    -
    -    // TODO: Actually test receiving data
    +    val topic = "topic1"
    +    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
    +    createTopic(topic)
    +    produceAndSendMessage(topic, sent)
    +
    +    val kafkaParams = Map("zookeeper.connect" -> zkConnect,
    +      "group.id" -> s"test-consumer-${random.nextInt(10000)}",
    +      "auto.offset.reset" -> "smallest")
    +
    +    val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](
    +      ssc,
    +      kafkaParams,
    +      Map(topic -> 1),
    +      StorageLevel.MEMORY_ONLY)
    +    val result = new mutable.HashMap[String, Long]()
    +    stream.map { case (k, v) => v }
    +      .countByValue()
    +      .foreachRDD { r =>
    +        val ret = r.collect()
    +        ret.toMap.foreach { kv =>
    +          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    +          result.put(kv._1, count)
    +        }
    +      }
    +    ssc.start()
    +    ssc.awaitTermination(3000)
    +
    +    assert(sent.size === result.size)
    +    sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
    +
         ssc.stop()
       }
    +
    +  private def getBrokerConfig(port: Int): Properties = {
    +    val props = new Properties()
    +    props.put("broker.id", "0")
    +    props.put("host.name", "localhost")
    +    props.put("port", port.toString)
    +    props.put("log.dir", createTmpDir().getAbsolutePath)
    +    props.put("zookeeper.connect", zkConnect)
    +    props.put("log.flush.interval.messages", "1")
    +    props.put("replica.socket.timeout.ms", "1500")
    +    props
    +  }
    +
    +  private def getProducerConfig(brokerList: String): Properties = {
    +    val props = new Properties()
    +    props.put("metadata.broker.list", brokerList)
    +    props.put("serializer.class", classOf[StringEncoder].getName)
    +    props
    +  }
    +
    +  private def createTestMessage(topic: String, sent: Map[String, Int])
    +    : Seq[KeyedMessage[String, String]] = {
    +    val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
    +      new KeyedMessage[String, String](topic, s)
    +    }
    +    messages.toSeq
    +  }
    +
    +  def createTopic(topic: String) {
    +    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
    +    logInfo("==================== 5 ====================")
    +    // wait until metadata is propagated
    +    waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
    +  }
    +
    +  def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
    +    val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
    +    producer = new Producer[String, String](new 
ProducerConfig(getProducerConfig(brokerAddr)))
    +    producer.send(createTestMessage(topic, sent): _*)
    +    logInfo("==================== 6 ====================")
    +  }
    +}
    +
    +object KafkaStreamSuite {
    +  val random = new Random()
    +
    +  def createTmpDir(): File = {
    +    val tmp = System.getProperty("java.io.tmpdir")
    +    val f = new File(tmp, "spark-kafka-" + random.nextInt(10000))
    +    f.mkdirs()
    +    f
    +  }
    +
    +  def deleteDir(file: File) {
    --- End diff --
    
    You can directly using spark.Utils functions deleteRecursively() and 
createTempDir() for this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to