Hi Akhil, That's exactly what I needed. You saved my day :)
Thanks a lot, Best, Zoran On Fri, Jul 24, 2015 at 12:28 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Yes, that is correct, sorry for confusing you. But i guess this is what > you are looking for, let me know if that doesn't help: > > val filtered_statuses = stream.transform(rdd =>{ > > //Instead of hardcoding, you can fetch these from a MySQL or a file > or whatever > val sampleHashTags = > Array("#teenchoice".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase) > > val filtered = rdd.filter(status =>{ > var found = false > for(tag <- sampleHashTags){ > if(status.getText.toLowerCase.contains(tag)) found = true > } > found > }) > > filtered > }) > > Thanks > Best Regards > > On Fri, Jul 24, 2015 at 11:25 AM, Zoran Jeremic <zoran.jere...@gmail.com> > wrote: > >> Hi Akhil, >> >> Thank you for sending this code. My apologize if I will ask something >> that is obvious here, since I'm newbie in Scala, but I still don't see how >> I can use this code. Maybe my original question was not very clear. >> >> What I need is to get each Twitter Status that contains one of the >> hashtags I'm following. I'm implementing learning platform, where each >> course will have at least one hashtag, e.g. #cs270computersciencecourse. If >> somebody post anything on Twitter with that hashtag, I want to get it and >> save Twitter status in the system, so it can be shown in the application. >> Other tweets should be ignored, but each tweet containing one of the >> hashtags I'm following should be stored in the application, so I can't >> process "most popular tweets" or something like that where it's possible >> that I miss somebody's post. There is a list of hashtags that is followed >> by stream, and this list should be possible to be updated by users. >> >> If I understood well, code you sent me extracts hashtags from statuses >> received through the stream, and it continue processing these hashtags, but >> at the end I will have only hashtags without statuses. Is that correct, or >> I missed something? >> >> Thanks, >> Zoran >> >> >> >> On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> That was a pseudo code, working version would look like this: >>> >>> val stream = TwitterUtils.createStream(ssc, None) >>> >>> val hashTags = stream.flatMap(status => status.getText.split(" >>> ").filter(_.startsWith("#"))).map(x => (x.toLowerCase,1)) >>> >>> val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, >>> Seconds(10)) >>> .map{case (topic, count) => (count, topic)} >>> .transform(_.sortByKey(false)).map(x => x._2) >>> >>> topCounts10.print() >>> >>> >>> val filteredStream = topCounts10.transform(rdd =>{ >>> *val samplehashtags = >>> ssc.sparkContext.parallelize(Array("#RobinWilliams".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase))* >>> val newRDD = samplehashtags.map { x => (x,1) } >>> val joined = newRDD.join(rdd) >>> >>> joined >>> }) >>> >>> filteredStream.print() >>> >>> Thanks >>> Best Regards >>> >>> On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic <zoran.jere...@gmail.com> >>> wrote: >>> >>>> Hi Akhil and Jorn, >>>> >>>> I tried as you suggested to create some simple scenario, but I have an >>>> error on rdd.join(newRDD): "value join is not a member of >>>> org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like: >>>> >>>> val stream = TwitterUtils.createStream(ssc, auth) >>>>> val filteredStream= stream.transform(rdd =>{ >>>>> val samplehashtags=Array("music","film") >>>>> val newRDD= samplehashtags.map { x => (x,1) } >>>>> rdd.join(newRDD) >>>>> }) >>>>> >>>> >>>> Did I miss something here? >>>> >>>> Thanks, >>>> Zoran >>>> >>>> On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic <zoran.jere...@gmail.com >>>> > wrote: >>>> >>>>> Thanks for explanation. >>>>> >>>>> If I understand this correctly, in this approach I would actually >>>>> stream everything from Twitter, and perform filtering in my application >>>>> using Spark. Isn't this too much overhead if my application is interested >>>>> in listening for couple of hundreds or thousands hashtags? >>>>> On one side, this will be better approach since I will not have the >>>>> problem to open new streams if number of hashtags go over 400 which is the >>>>> Twitter limit for User stream filtering, but on the other side I'm concern >>>>> about how much it will affect application performance if I stream >>>>> everything that is posted on Twitter and filter it locally. It would be >>>>> great if somebody with experience on this could comment on these concerns. >>>>> >>>>> Thanks, >>>>> Zoran >>>>> >>>>> On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das < >>>>> ak...@sigmoidanalytics.com> wrote: >>>>> >>>>>> Jorn meant something like this: >>>>>> >>>>>> val filteredStream = twitterStream.transform(rdd =>{ >>>>>> >>>>>> val newRDD = >>>>>> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1)) >>>>>> >>>>>> rdd.join(newRDD) >>>>>> >>>>>> }) >>>>>> >>>>>> newRDD will work like a filter when you do the join. >>>>>> >>>>>> >>>>>> Thanks >>>>>> Best Regards >>>>>> >>>>>> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic < >>>>>> zoran.jere...@gmail.com> wrote: >>>>>> >>>>>>> Hi Jorn, >>>>>>> >>>>>>> I didn't know that it is possible to change filter without >>>>>>> re-opening twitter stream. Actually, I already had that question >>>>>>> earlier at >>>>>>> the stackoverflow >>>>>>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming> >>>>>>> and I got the answer that it's not possible, but it would be even >>>>>>> better if >>>>>>> there is some other way to add new hashtags or to remove old hashtags >>>>>>> that >>>>>>> user stopped following. I guess the second request would be more >>>>>>> difficult. >>>>>>> >>>>>>> However, it would be great if you can give me some short example how >>>>>>> to make this. I didn't understand well from your explanation what you >>>>>>> mean >>>>>>> by "join it with a rdd loading the newest hash tags from disk in a >>>>>>> regular >>>>>>> interval". >>>>>>> >>>>>>> Thanks, >>>>>>> Zoran >>>>>>> >>>>>>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Why do you even want to stop it? You can join it with a rdd loading >>>>>>>> the newest hash tags from disk in a regular interval >>>>>>>> >>>>>>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic < >>>>>>>> zoran.jere...@gmail.com> a écrit : >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I have a twitter spark stream initialized in the following way: >>>>>>>>> >>>>>>>>> val ssc:StreamingContext = >>>>>>>>>> SparkLauncher.getSparkScalaStreamingContext() >>>>>>>>>> val config = getTwitterConfigurationBuilder.build() >>>>>>>>>> val auth: Option[twitter4j.auth.Authorization] = >>>>>>>>>> Some(new >>>>>>>>>> >>>>>>>>>> twitter4j.auth.OAuthAuthorization(config)) >>>>>>>>>> val stream = TwitterUtils.createStream(ssc, auth, >>>>>>>>>> filters) >>>>>>>>>> >>>>>>>>> >>>>>>>>> This works fine when I initialy start it. However, at some point I >>>>>>>>> need to update filters since users might add new hashtags they want to >>>>>>>>> follow. I tried to stop the running stream and spark streaming context >>>>>>>>> without stoping spark context, e.g: >>>>>>>>> >>>>>>>>> >>>>>>>>>> stream.stop() >>>>>>>>>> ssc.stop(false) >>>>>>>>>> >>>>>>>>> >>>>>>>>> Afterward, I'm trying to initialize a new Twitter stream like I >>>>>>>>> did previously. However, I got this exception: >>>>>>>>> >>>>>>>>> Exception in thread "Firestorm JMX Monitor" >>>>>>>>>> java.lang.IllegalStateException: Adding new inputs, transformations, >>>>>>>>>> and >>>>>>>>>> output operations after stopping a context is not supported >>>>>>>>>> at >>>>>>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) >>>>>>>>>> at >>>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) >>>>>>>>>> at >>>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) >>>>>>>>>> at >>>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) >>>>>>>>>> at >>>>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) >>>>>>>>>> at >>>>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) >>>>>>>>>> at java.util.TimerThread.mainLoop(Timer.java:555) >>>>>>>>>> at java.util.TimerThread.run(Timer.java:505) >>>>>>>>>> INFO [2015-07-18 22:24:23,430] [Twitter Stream >>>>>>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl >>>>>>>>>> (SLF4JLogger.java:83) Inflater has been closed >>>>>>>>>> ERROR [2015-07-18 22:24:32,503] >>>>>>>>>> [sparkDriver-akka.actor.default-dispatcher-3] >>>>>>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) >>>>>>>>>> Error >>>>>>>>>> stopping receiver >>>>>>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) >>>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> Anybody can explain how to solve this issue? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Zoran >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> ******************************************************************************* >>>>>>> Zoran Jeremic, PhD >>>>>>> Senior System Analyst & Programmer >>>>>>> >>>>>>> Athabasca University >>>>>>> Tel: +1 604 92 89 944 >>>>>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >>>>>>> Homepage: http://zoranjeremic.org >>>>>>> >>>>>>> ********************************************************************************** >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> ******************************************************************************* >>>>> Zoran Jeremic, PhD >>>>> Senior System Analyst & Programmer >>>>> >>>>> Athabasca University >>>>> Tel: +1 604 92 89 944 >>>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >>>>> Homepage: http://zoranjeremic.org >>>>> >>>>> ********************************************************************************** >>>>> >>>> >>>> >>>> >>>> -- >>>> >>>> ******************************************************************************* >>>> Zoran Jeremic, PhD >>>> Senior System Analyst & Programmer >>>> >>>> Athabasca University >>>> Tel: +1 604 92 89 944 >>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >>>> Homepage: http://zoranjeremic.org >>>> >>>> ********************************************************************************** >>>> >>> >>> >> >> >> -- >> >> ******************************************************************************* >> Zoran Jeremic, PhD >> Senior System Analyst & Programmer >> >> Athabasca University >> Tel: +1 604 92 89 944 >> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >> Homepage: http://zoranjeremic.org >> >> ********************************************************************************** >> > > -- ******************************************************************************* Zoran Jeremic, PhD Senior System Analyst & Programmer Athabasca University Tel: +1 604 92 89 944 E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> Homepage: http://zoranjeremic.org **********************************************************************************