Hi,
I have the following driver and it works when I run it in the local[*] mode
but if I execute it in a standalone cluster then then I don't get any data
from kafka.
Does anybody know why that might be?
val sparkConf = new SparkConf().setAppName("KafkaMessageReceiver")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext._
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("checkpoint")
val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
val eventData =
dStream.map(_._2).window(Seconds(12)).map(_.split(",")).map(data =>
Data(data(0), data(1), data(2), data(3), data(4)))
val result = eventData.transform((rdd, time) => {
sqlContext.registerRDDAsTable(rdd, "data")
sql("SELECT count(state) FROM data WHERE state='Active'")
})
result.print()
//eventData.foreachRDD(rdd => registerRDDAsTable(rdd, "data"))
ssc.start()
ssc.awaitTermination()
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-tp11364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]