Hi All,

Couple days ago, I tried to integrate SQL and streaming together. My
understanding is I can transform RDD from Dstream to schemaRDD and execute
SQL on each RDD. But I got no luck
Would you guys help me take a look at my code?  Thank you very much!

object KafkaSpark {

  def main(args: Array[String]): Unit = {
    if (args.length < 4) {
      System.err.println("Usage: KafkaSpark <zkQuorum> <group> <topics>
<numThreads>")
      System.exit(1)
    }


    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaSpark")
    val ssc =  new StreamingContext(sparkConf, Seconds(10))
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc);
//    ssc.checkpoint("checkpoint")

    // Importing the SQL context gives access to all the SQL functions and
implicit conversions.
    import sqlContext._


    val tt = Time(10000)
    val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(t => getRecord(t._2.split("#")))

    val result = recordsStream.foreachRDD((recRDD, tt)=>{
      recRDD.registerAsTable("records")
      val result = sql("select * from records")
      println(result)
      result.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()

  }

  def getRecord(l:Array[String]):Record = {
    println("Getting the record")
    Record(l(0), l(1))}
}

Reply via email to