Spark streaming as it exists today is always microbatch. You can certainly filter messages using spark streaming.
On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > yep actually using createDirectStream sounds a better way of doing it. Am I > correct that createDirectStream was introduced to overcome micro-batching > limitations? > > In a nutshell I want to pickup all the messages and keep signal according to > pre-built criteria (say indicating a buy signal) and ignore the pedestals > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > http://talebzadehmich.wordpress.com > > > > > On 22 April 2016 at 21:56, Cody Koeninger <c...@koeninger.org> wrote: >> >> You can still do sliding windows with createDirectStream, just do your >> map / extraction of fields before the window. >> >> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh >> <mich.talebza...@gmail.com> wrote: >> > Hi Cody, >> > >> > I want to use sliding windows for Complex Event Processing >> > micro-batching >> > >> > Dr Mich Talebzadeh >> > >> > >> > >> > LinkedIn >> > >> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> > >> > >> > >> > http://talebzadehmich.wordpress.com >> > >> > >> > >> > >> > On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> wrote: >> >> >> >> Why are you wanting to convert? >> >> >> >> As far as doing the conversion, createStream doesn't take the same >> >> arguments, look at the docs. >> >> >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh >> >> <mich.talebza...@gmail.com> wrote: >> >> > Hi, >> >> > >> >> > What is the best way of converting this program of that uses >> >> > KafkaUtils.createDirectStream to Sliding window using >> >> > >> >> > val dstream = KafkaUtils.createDirectStream[String, String, >> >> > StringDecoder, >> >> > StringDecoder](ssc, kafkaParams, topic) >> >> > >> >> > to >> >> > >> >> > val dstream = KafkaUtils.createStream[String, String, StringDecoder, >> >> > StringDecoder](ssc, kafkaParams, topic) >> >> > >> >> > >> >> > The program below works >> >> > >> >> > >> >> > import org.apache.spark.SparkContext >> >> > import org.apache.spark.SparkConf >> >> > import org.apache.spark.sql.Row >> >> > import org.apache.spark.sql.hive.HiveContext >> >> > import org.apache.spark.sql.types._ >> >> > import org.apache.spark.sql.SQLContext >> >> > import org.apache.spark.sql.functions._ >> >> > import _root_.kafka.serializer.StringDecoder >> >> > import org.apache.spark.streaming._ >> >> > import org.apache.spark.streaming.kafka.KafkaUtils >> >> > // >> >> > object CEP_assembly { >> >> > def main(args: Array[String]) { >> >> > val conf = new SparkConf(). >> >> > setAppName("CEP_assembly"). >> >> > setMaster("local[2]"). >> >> > set("spark.driver.allowMultipleContexts", "true"). >> >> > set("spark.hadoop.validateOutputSpecs", "false") >> >> > val sc = new SparkContext(conf) >> >> > // Create sqlContext based on HiveContext >> >> > val sqlContext = new HiveContext(sc) >> >> > import sqlContext.implicits._ >> >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> >> > println ("\nStarted at"); sqlContext.sql("SELECT >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> >> > ").collect.foreach(println) >> >> > val ssc = new StreamingContext(conf, Seconds(1)) >> >> > ssc.checkpoint("checkpoint") >> >> > val kafkaParams = Map[String, String]("bootstrap.servers" -> >> >> > "rhes564:9092", >> >> > "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" >> >> > -> >> >> > "rhes564:2181", "group.id" -> "StreamTest" ) >> >> > val topic = Set("newtopic") >> >> > //val dstream = KafkaUtils.createStream[String, String, >> >> > StringDecoder, >> >> > StringDecoder](ssc, kafkaParams, topic) >> >> > val dstream = KafkaUtils.createDirectStream[String, String, >> >> > StringDecoder, >> >> > StringDecoder](ssc, kafkaParams, topic) >> >> > dstream.cache() >> >> > //val windowed_dstream = dstream.window(new >> >> > Duration(sliding_window_length), >> >> > new Duration(sliding_window_interval)) >> >> > dstream.print(1000) >> >> > val lines = dstream.map(_._2) >> >> > // Check for message >> >> > val showResults = lines.filter(_.contains("Sending >> >> > dstream")).flatMap(line >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> >> > _).print(1000) >> >> > // Check for statement cache >> >> > val showResults2 = lines.filter(_.contains("statement >> >> > cache")).flatMap(line >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> >> > _).print(1000) >> >> > ssc.start() >> >> > ssc.awaitTermination() >> >> > //ssc.stop() >> >> > println ("\nFinished at"); sqlContext.sql("SELECT >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> >> > ").collect.foreach(println) >> >> > } >> >> > } >> >> > >> >> > Thanks >> >> > >> >> > >> >> > Dr Mich Talebzadeh >> >> > >> >> > >> >> > >> >> > LinkedIn >> >> > >> >> > >> >> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >> > >> >> > >> >> > >> >> > http://talebzadehmich.wordpress.com >> >> > >> >> > >> > >> > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org