Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/1751#discussion_r15788804
--- Diff:
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
---
@@ -17,28 +17,210 @@
package org.apache.spark.streaming.kafka
-import kafka.serializer.StringDecoder
+import java.io.File
+import java.net.InetSocketAddress
+import java.util.{Properties, Random}
+
+import scala.collection.mutable
+
+import kafka.admin.CreateTopicCommand
+import kafka.common.TopicAndPartition
+import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
+import kafka.utils.ZKStringSerializer
+import kafka.serializer.{StringDecoder, StringEncoder}
+import kafka.server.{KafkaConfig, KafkaServer}
+
+import org.I0Itec.zkclient.ZkClient
+
+import org.apache.zookeeper.server.ZooKeeperServer
+import org.apache.zookeeper.server.NIOServerCnxnFactory
+
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
class KafkaStreamSuite extends TestSuiteBase {
+ import KafkaStreamSuite._
+
+ val zkConnect = "localhost:2181"
+ val zkConnectionTimeout = 6000
+ val zkSessionTimeout = 6000
+
+ val brokerPort = 9092
+ val brokerProps = getBrokerConfig(brokerPort)
+ val brokerConf = new KafkaConfig(brokerProps)
+
+ protected var zookeeper: EmbeddedZookeeper = _
+ protected var zkClient: ZkClient = _
+ protected var server: KafkaServer = _
+ protected var producer: Producer[String, String] = _
+
+ override def useManualClock = false
+
+ override def beforeFunction() {
+ // Zookeeper server startup
+ zookeeper = new EmbeddedZookeeper(zkConnect)
+ logInfo("==================== 0 ====================")
+ zkClient = new ZkClient(zkConnect, zkSessionTimeout,
zkConnectionTimeout, ZKStringSerializer)
+ logInfo("==================== 1 ====================")
+
+ // Kafka broker startup
+ server = new KafkaServer(brokerConf)
+ logInfo("==================== 2 ====================")
+ server.startup()
+ logInfo("==================== 3 ====================")
+ Thread.sleep(2000)
+ logInfo("==================== 4 ====================")
+ super.beforeFunction()
+ }
- test("kafka input stream") {
+ override def afterFunction() {
+ producer.close()
+ server.shutdown()
+ brokerConf.logDirs.foreach { f => deleteDir(new File(f)) }
+
+ zkClient.close()
+ zookeeper.shutdown()
+
+ super.afterFunction()
+ }
+
+ test("Kafka input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
- val topics = Map("my-topic" -> 1)
-
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[(String, String)] =
- KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
- val test2: ReceiverInputDStream[(String, String)] =
- KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2)
- val kafkaParams =
Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3: ReceiverInputDStream[(String, String)] =
- KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder](
- ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
-
- // TODO: Actually test receiving data
+ val topic = "topic1"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ createTopic(topic)
+ produceAndSendMessage(topic, sent)
+
+ val kafkaParams = Map("zookeeper.connect" -> zkConnect,
+ "group.id" -> s"test-consumer-${random.nextInt(10000)}",
+ "auto.offset.reset" -> "smallest")
+
+ val stream = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder](
+ ssc,
+ kafkaParams,
+ Map(topic -> 1),
+ StorageLevel.MEMORY_ONLY)
+ val result = new mutable.HashMap[String, Long]()
+ stream.map { case (k, v) => v }
+ .countByValue()
+ .foreachRDD { r =>
+ val ret = r.collect()
+ ret.toMap.foreach { kv =>
+ val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+ result.put(kv._1, count)
+ }
+ }
+ ssc.start()
+ ssc.awaitTermination(3000)
+
+ assert(sent.size === result.size)
+ sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
+
ssc.stop()
}
+
+ private def getBrokerConfig(port: Int): Properties = {
--- End diff --
Its better to move all these utility functions for generating local kafka
test harness to a separate class, say KafkaTestUtils.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]