hi, I'm trying to run simple kafka spark streaming example over spark-shell:
sc.stop import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import kafka.serializer.DefaultDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel val sparkConf = new SparkConf().setAppName("Summarizer").setMaster("local") val ssc = new StreamingContext(sparkConf, Seconds(10)) val kafkaParams = Map[String, String]("zookeeper.connect" -> "127.0.0.1:2181", "group.id" -> "test") val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, Map("test" -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2) messages.foreachRDD { pairRDD => println(s"DataListener.listen() [pairRDD = ${pairRDD}]") println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]") pairRDD.foreach(row => println(s"DataListener.listen() [row = ${row}]")) } ssc.start() ssc.awaitTermination() in spark output i'm able to find only following println log: println(s"DataListener.listen() [pairRDD = ${pairRDD}]") but unfortunately can't find output of: println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]") and println(s"DataListener.listen() [row = ${row}]") it's my spark-shell full output - http://pastebin.com/sfxbYYga <http://pastebin.com/sfxbYYga> any ideas what i'm doing wrong? thanks!