Yes, that is correct, sorry for confusing you. But i guess this is what you are looking for, let me know if that doesn't help:
val filtered_statuses = stream.transform(rdd =>{ //Instead of hardcoding, you can fetch these from a MySQL or a file or whatever val sampleHashTags = Array("#teenchoice".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase) val filtered = rdd.filter(status =>{ var found = false for(tag <- sampleHashTags){ if(status.getText.toLowerCase.contains(tag)) found = true } found }) filtered }) Thanks Best Regards On Fri, Jul 24, 2015 at 11:25 AM, Zoran Jeremic <zoran.jere...@gmail.com> wrote: > Hi Akhil, > > Thank you for sending this code. My apologize if I will ask something that > is obvious here, since I'm newbie in Scala, but I still don't see how I can > use this code. Maybe my original question was not very clear. > > What I need is to get each Twitter Status that contains one of the > hashtags I'm following. I'm implementing learning platform, where each > course will have at least one hashtag, e.g. #cs270computersciencecourse. If > somebody post anything on Twitter with that hashtag, I want to get it and > save Twitter status in the system, so it can be shown in the application. > Other tweets should be ignored, but each tweet containing one of the > hashtags I'm following should be stored in the application, so I can't > process "most popular tweets" or something like that where it's possible > that I miss somebody's post. There is a list of hashtags that is followed > by stream, and this list should be possible to be updated by users. > > If I understood well, code you sent me extracts hashtags from statuses > received through the stream, and it continue processing these hashtags, but > at the end I will have only hashtags without statuses. Is that correct, or > I missed something? > > Thanks, > Zoran > > > > On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> That was a pseudo code, working version would look like this: >> >> val stream = TwitterUtils.createStream(ssc, None) >> >> val hashTags = stream.flatMap(status => status.getText.split(" >> ").filter(_.startsWith("#"))).map(x => (x.toLowerCase,1)) >> >> val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, >> Seconds(10)) >> .map{case (topic, count) => (count, topic)} >> .transform(_.sortByKey(false)).map(x => x._2) >> >> topCounts10.print() >> >> >> val filteredStream = topCounts10.transform(rdd =>{ >> *val samplehashtags = >> ssc.sparkContext.parallelize(Array("#RobinWilliams".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase))* >> val newRDD = samplehashtags.map { x => (x,1) } >> val joined = newRDD.join(rdd) >> >> joined >> }) >> >> filteredStream.print() >> >> Thanks >> Best Regards >> >> On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic <zoran.jere...@gmail.com> >> wrote: >> >>> Hi Akhil and Jorn, >>> >>> I tried as you suggested to create some simple scenario, but I have an >>> error on rdd.join(newRDD): "value join is not a member of >>> org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like: >>> >>> val stream = TwitterUtils.createStream(ssc, auth) >>>> val filteredStream= stream.transform(rdd =>{ >>>> val samplehashtags=Array("music","film") >>>> val newRDD= samplehashtags.map { x => (x,1) } >>>> rdd.join(newRDD) >>>> }) >>>> >>> >>> Did I miss something here? >>> >>> Thanks, >>> Zoran >>> >>> On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic <zoran.jere...@gmail.com> >>> wrote: >>> >>>> Thanks for explanation. >>>> >>>> If I understand this correctly, in this approach I would actually >>>> stream everything from Twitter, and perform filtering in my application >>>> using Spark. Isn't this too much overhead if my application is interested >>>> in listening for couple of hundreds or thousands hashtags? >>>> On one side, this will be better approach since I will not have the >>>> problem to open new streams if number of hashtags go over 400 which is the >>>> Twitter limit for User stream filtering, but on the other side I'm concern >>>> about how much it will affect application performance if I stream >>>> everything that is posted on Twitter and filter it locally. It would be >>>> great if somebody with experience on this could comment on these concerns. >>>> >>>> Thanks, >>>> Zoran >>>> >>>> On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das <ak...@sigmoidanalytics.com >>>> > wrote: >>>> >>>>> Jorn meant something like this: >>>>> >>>>> val filteredStream = twitterStream.transform(rdd =>{ >>>>> >>>>> val newRDD = >>>>> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1)) >>>>> >>>>> rdd.join(newRDD) >>>>> >>>>> }) >>>>> >>>>> newRDD will work like a filter when you do the join. >>>>> >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic < >>>>> zoran.jere...@gmail.com> wrote: >>>>> >>>>>> Hi Jorn, >>>>>> >>>>>> I didn't know that it is possible to change filter without re-opening >>>>>> twitter stream. Actually, I already had that question earlier at the >>>>>> stackoverflow >>>>>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming> >>>>>> and I got the answer that it's not possible, but it would be even better >>>>>> if >>>>>> there is some other way to add new hashtags or to remove old hashtags >>>>>> that >>>>>> user stopped following. I guess the second request would be more >>>>>> difficult. >>>>>> >>>>>> However, it would be great if you can give me some short example how >>>>>> to make this. I didn't understand well from your explanation what you >>>>>> mean >>>>>> by "join it with a rdd loading the newest hash tags from disk in a >>>>>> regular >>>>>> interval". >>>>>> >>>>>> Thanks, >>>>>> Zoran >>>>>> >>>>>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Why do you even want to stop it? You can join it with a rdd loading >>>>>>> the newest hash tags from disk in a regular interval >>>>>>> >>>>>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jere...@gmail.com> >>>>>>> a écrit : >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I have a twitter spark stream initialized in the following way: >>>>>>>> >>>>>>>> val ssc:StreamingContext = >>>>>>>>> SparkLauncher.getSparkScalaStreamingContext() >>>>>>>>> val config = getTwitterConfigurationBuilder.build() >>>>>>>>> val auth: Option[twitter4j.auth.Authorization] = >>>>>>>>> Some(new >>>>>>>>> >>>>>>>>> twitter4j.auth.OAuthAuthorization(config)) >>>>>>>>> val stream = TwitterUtils.createStream(ssc, auth, >>>>>>>>> filters) >>>>>>>>> >>>>>>>> >>>>>>>> This works fine when I initialy start it. However, at some point I >>>>>>>> need to update filters since users might add new hashtags they want to >>>>>>>> follow. I tried to stop the running stream and spark streaming context >>>>>>>> without stoping spark context, e.g: >>>>>>>> >>>>>>>> >>>>>>>>> stream.stop() >>>>>>>>> ssc.stop(false) >>>>>>>>> >>>>>>>> >>>>>>>> Afterward, I'm trying to initialize a new Twitter stream like I did >>>>>>>> previously. However, I got this exception: >>>>>>>> >>>>>>>> Exception in thread "Firestorm JMX Monitor" >>>>>>>>> java.lang.IllegalStateException: Adding new inputs, transformations, >>>>>>>>> and >>>>>>>>> output operations after stopping a context is not supported >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) >>>>>>>>> at >>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) >>>>>>>>> at >>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) >>>>>>>>> at >>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) >>>>>>>>> at >>>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) >>>>>>>>> at >>>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) >>>>>>>>> at java.util.TimerThread.mainLoop(Timer.java:555) >>>>>>>>> at java.util.TimerThread.run(Timer.java:505) >>>>>>>>> INFO [2015-07-18 22:24:23,430] [Twitter Stream >>>>>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl >>>>>>>>> (SLF4JLogger.java:83) Inflater has been closed >>>>>>>>> ERROR [2015-07-18 22:24:32,503] >>>>>>>>> [sparkDriver-akka.actor.default-dispatcher-3] >>>>>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) >>>>>>>>> Error >>>>>>>>> stopping receiver >>>>>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> Anybody can explain how to solve this issue? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Zoran >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> ******************************************************************************* >>>>>> Zoran Jeremic, PhD >>>>>> Senior System Analyst & Programmer >>>>>> >>>>>> Athabasca University >>>>>> Tel: +1 604 92 89 944 >>>>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >>>>>> Homepage: http://zoranjeremic.org >>>>>> >>>>>> ********************************************************************************** >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> ******************************************************************************* >>>> Zoran Jeremic, PhD >>>> Senior System Analyst & Programmer >>>> >>>> Athabasca University >>>> Tel: +1 604 92 89 944 >>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >>>> Homepage: http://zoranjeremic.org >>>> >>>> ********************************************************************************** >>>> >>> >>> >>> >>> -- >>> >>> ******************************************************************************* >>> Zoran Jeremic, PhD >>> Senior System Analyst & Programmer >>> >>> Athabasca University >>> Tel: +1 604 92 89 944 >>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >>> Homepage: http://zoranjeremic.org >>> >>> ********************************************************************************** >>> >> >> > > > -- > > ******************************************************************************* > Zoran Jeremic, PhD > Senior System Analyst & Programmer > > Athabasca University > Tel: +1 604 92 89 944 > E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> > Homepage: http://zoranjeremic.org > > ********************************************************************************** >