Hi Cody, This is my first attempt on using offset ranges (this may not mean much in my context at the moment)
val ssc = new StreamingContext(conf, Seconds(10)) ssc.checkpoint("checkpoint") val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) val topics = Set("newtopic", "newtopic") val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) dstream.cache() val lines = dstream.map(_._2) val showResults = lines.filter(_.contains("statement cache")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) // Define the offset ranges to read in the batch job. Just one offset range val offsetRanges = Array( OffsetRange("newtopic", 0, 110, 220) ) // Create the RDD based on the offset ranges val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](sc, kafkaParams, offsetRanges) This comes back with error [info] Compiling 1 Scala source to /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes... [error] /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37: not found: value OffsetRange [error] OffsetRange("newtopic", 0, 110, 220), [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed Any ideas will be appreciated Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 22 April 2016 at 22:04, Cody Koeninger <c...@koeninger.org> wrote: > Spark streaming as it exists today is always microbatch. > > You can certainly filter messages using spark streaming. > > > On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh > <mich.talebza...@gmail.com> wrote: > > yep actually using createDirectStream sounds a better way of doing it. > Am I > > correct that createDirectStream was introduced to overcome micro-batching > > limitations? > > > > In a nutshell I want to pickup all the messages and keep signal > according to > > pre-built criteria (say indicating a buy signal) and ignore the pedestals > > > > Thanks > > > > Dr Mich Talebzadeh > > > > > > > > LinkedIn > > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > > > > > http://talebzadehmich.wordpress.com > > > > > > > > > > On 22 April 2016 at 21:56, Cody Koeninger <c...@koeninger.org> wrote: > >> > >> You can still do sliding windows with createDirectStream, just do your > >> map / extraction of fields before the window. > >> > >> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh > >> <mich.talebza...@gmail.com> wrote: > >> > Hi Cody, > >> > > >> > I want to use sliding windows for Complex Event Processing > >> > micro-batching > >> > > >> > Dr Mich Talebzadeh > >> > > >> > > >> > > >> > LinkedIn > >> > > >> > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >> > > >> > > >> > > >> > http://talebzadehmich.wordpress.com > >> > > >> > > >> > > >> > > >> > On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> wrote: > >> >> > >> >> Why are you wanting to convert? > >> >> > >> >> As far as doing the conversion, createStream doesn't take the same > >> >> arguments, look at the docs. > >> >> > >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh > >> >> <mich.talebza...@gmail.com> wrote: > >> >> > Hi, > >> >> > > >> >> > What is the best way of converting this program of that uses > >> >> > KafkaUtils.createDirectStream to Sliding window using > >> >> > > >> >> > val dstream = KafkaUtils.createDirectStream[String, String, > >> >> > StringDecoder, > >> >> > StringDecoder](ssc, kafkaParams, topic) > >> >> > > >> >> > to > >> >> > > >> >> > val dstream = KafkaUtils.createStream[String, String, > StringDecoder, > >> >> > StringDecoder](ssc, kafkaParams, topic) > >> >> > > >> >> > > >> >> > The program below works > >> >> > > >> >> > > >> >> > import org.apache.spark.SparkContext > >> >> > import org.apache.spark.SparkConf > >> >> > import org.apache.spark.sql.Row > >> >> > import org.apache.spark.sql.hive.HiveContext > >> >> > import org.apache.spark.sql.types._ > >> >> > import org.apache.spark.sql.SQLContext > >> >> > import org.apache.spark.sql.functions._ > >> >> > import _root_.kafka.serializer.StringDecoder > >> >> > import org.apache.spark.streaming._ > >> >> > import org.apache.spark.streaming.kafka.KafkaUtils > >> >> > // > >> >> > object CEP_assembly { > >> >> > def main(args: Array[String]) { > >> >> > val conf = new SparkConf(). > >> >> > setAppName("CEP_assembly"). > >> >> > setMaster("local[2]"). > >> >> > set("spark.driver.allowMultipleContexts", "true"). > >> >> > set("spark.hadoop.validateOutputSpecs", "false") > >> >> > val sc = new SparkContext(conf) > >> >> > // Create sqlContext based on HiveContext > >> >> > val sqlContext = new HiveContext(sc) > >> >> > import sqlContext.implicits._ > >> >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > >> >> > println ("\nStarted at"); sqlContext.sql("SELECT > >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > >> >> > ").collect.foreach(println) > >> >> > val ssc = new StreamingContext(conf, Seconds(1)) > >> >> > ssc.checkpoint("checkpoint") > >> >> > val kafkaParams = Map[String, String]("bootstrap.servers" -> > >> >> > "rhes564:9092", > >> >> > "schema.registry.url" -> "http://rhes564:8081", > "zookeeper.connect" > >> >> > -> > >> >> > "rhes564:2181", "group.id" -> "StreamTest" ) > >> >> > val topic = Set("newtopic") > >> >> > //val dstream = KafkaUtils.createStream[String, String, > >> >> > StringDecoder, > >> >> > StringDecoder](ssc, kafkaParams, topic) > >> >> > val dstream = KafkaUtils.createDirectStream[String, String, > >> >> > StringDecoder, > >> >> > StringDecoder](ssc, kafkaParams, topic) > >> >> > dstream.cache() > >> >> > //val windowed_dstream = dstream.window(new > >> >> > Duration(sliding_window_length), > >> >> > new Duration(sliding_window_interval)) > >> >> > dstream.print(1000) > >> >> > val lines = dstream.map(_._2) > >> >> > // Check for message > >> >> > val showResults = lines.filter(_.contains("Sending > >> >> > dstream")).flatMap(line > >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > >> >> > _).print(1000) > >> >> > // Check for statement cache > >> >> > val showResults2 = lines.filter(_.contains("statement > >> >> > cache")).flatMap(line > >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > >> >> > _).print(1000) > >> >> > ssc.start() > >> >> > ssc.awaitTermination() > >> >> > //ssc.stop() > >> >> > println ("\nFinished at"); sqlContext.sql("SELECT > >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > >> >> > ").collect.foreach(println) > >> >> > } > >> >> > } > >> >> > > >> >> > Thanks > >> >> > > >> >> > > >> >> > Dr Mich Talebzadeh > >> >> > > >> >> > > >> >> > > >> >> > LinkedIn > >> >> > > >> >> > > >> >> > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >> >> > > >> >> > > >> >> > > >> >> > http://talebzadehmich.wordpress.com > >> >> > > >> >> > > >> > > >> > > > > > >