Hi, What is the best way of converting this program of that uses KafkaUtils.createDirectStream to Sliding window using
val dstream = *KafkaUtils.createDirectStream*[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) to val dstream = *KafkaUtils.createStream*[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) The program below works import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import _root_.kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils // object CEP_assembly { def main(args: Array[String]) { val conf = new SparkConf(). setAppName("CEP_assembly"). setMaster("local[2]"). set("spark.driver.allowMultipleContexts", "true"). set("spark.hadoop.validateOutputSpecs", "false") val sc = new SparkContext(conf) // Create sqlContext based on HiveContext val sqlContext = new HiveContext(sc) import sqlContext.implicits._ val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) println ("\nStarted at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint("checkpoint") val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) val topic = Set("newtopic") //val dstream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) dstream.cache() //val windowed_dstream = dstream.window(new Duration(sliding_window_length), new Duration(sliding_window_interval)) dstream.print(1000) val lines = dstream.map(_._2) // Check for message val showResults = lines.filter(_.contains("Sending dstream")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000) // Check for statement cache val showResults2 = lines.filter(_.contains("statement cache")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000) ssc.start() ssc.awaitTermination() //ssc.stop() println ("\nFinished at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) } } Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com