Yes, that is correct, sorry for confusing you. But i guess this is what you
are looking for, let me know if that doesn't help:

val filtered_statuses = stream.transform(rdd =>{

      //Instead of hardcoding, you can fetch these from a MySQL or a file
or whatever
      val sampleHashTags =
Array("#teenchoice".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase)

      val filtered = rdd.filter(status =>{
        var found = false
        for(tag <- sampleHashTags){
          if(status.getText.toLowerCase.contains(tag)) found = true
        }
        found
      })

      filtered
    })

Thanks
Best Regards

On Fri, Jul 24, 2015 at 11:25 AM, Zoran Jeremic <zoran.jere...@gmail.com>
wrote:

> Hi Akhil,
>
> Thank you for sending this code. My apologize if I will ask something that
> is obvious here, since I'm newbie in Scala, but I still don't see how I can
> use this code. Maybe my original question was not very clear.
>
> What I need is to get each Twitter Status that contains one of the
> hashtags I'm following. I'm implementing learning platform, where each
> course will have at least one hashtag, e.g. #cs270computersciencecourse. If
> somebody post anything on Twitter with that hashtag, I want  to get it and
> save Twitter status in the system, so it can be shown in the application.
> Other tweets should be ignored, but each tweet containing one of the
> hashtags I'm following should be stored in the application, so I can't
> process "most popular tweets" or something like that where it's possible
> that I miss somebody's post. There is a list of hashtags that is followed
> by stream, and this list should be possible to be updated by users.
>
> If I understood well, code you sent me extracts hashtags from statuses
> received through the stream, and it continue processing these hashtags, but
> at the end I will have only hashtags without statuses. Is that correct, or
> I missed something?
>
> Thanks,
> Zoran
>
>
>
> On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> That was a pseudo code, working version would look like this:
>>
>>     val stream = TwitterUtils.createStream(ssc, None)
>>
>>     val hashTags = stream.flatMap(status => status.getText.split("
>> ").filter(_.startsWith("#"))).map(x => (x.toLowerCase,1))
>>
>>     val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
>> Seconds(10))
>>       .map{case (topic, count) => (count, topic)}
>>       .transform(_.sortByKey(false)).map(x => x._2)
>>
>>     topCounts10.print()
>>
>>
>>     val filteredStream = topCounts10.transform(rdd =>{
>>       *val samplehashtags =
>> ssc.sparkContext.parallelize(Array("#RobinWilliams".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase))*
>>       val newRDD = samplehashtags.map { x => (x,1) }
>>       val joined = newRDD.join(rdd)
>>
>>       joined
>>     })
>>
>>     filteredStream.print()
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic <zoran.jere...@gmail.com>
>> wrote:
>>
>>> Hi Akhil and Jorn,
>>>
>>> I tried as you suggested to create some simple scenario, but I have an
>>> error on rdd.join(newRDD):  "value join is not a member of
>>> org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like:
>>>
>>>     val stream = TwitterUtils.createStream(ssc, auth)
>>>>     val filteredStream= stream.transform(rdd =>{
>>>>     val samplehashtags=Array("music","film")
>>>>     val newRDD= samplehashtags.map { x => (x,1) }
>>>>     rdd.join(newRDD)
>>>>      })
>>>>
>>>
>>> Did I miss something here?
>>>
>>> Thanks,
>>> Zoran
>>>
>>> On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic <zoran.jere...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for explanation.
>>>>
>>>> If I understand this correctly, in this approach I would actually
>>>> stream everything from Twitter, and perform filtering in my application
>>>> using Spark. Isn't this too much overhead if my application is interested
>>>> in listening for couple of hundreds or thousands hashtags?
>>>> On one side, this will be better approach since I will not have the
>>>> problem to open new streams if number of hashtags go over 400 which is the
>>>> Twitter limit for User stream filtering, but on the other side I'm concern
>>>> about how much it will affect application performance if I stream
>>>> everything that is posted on Twitter and filter it locally. It would be
>>>> great if somebody with experience on this could comment on these concerns.
>>>>
>>>> Thanks,
>>>> Zoran
>>>>
>>>> On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das <ak...@sigmoidanalytics.com
>>>> > wrote:
>>>>
>>>>> Jorn meant something like this:
>>>>>
>>>>> val filteredStream = twitterStream.transform(rdd =>{
>>>>>
>>>>> val newRDD =
>>>>> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1))
>>>>>
>>>>> rdd.join(newRDD)
>>>>>
>>>>> })
>>>>>
>>>>> ​newRDD will work like a filter when you do the join.​
>>>>>
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <
>>>>> zoran.jere...@gmail.com> wrote:
>>>>>
>>>>>> Hi Jorn,
>>>>>>
>>>>>> I didn't know that it is possible to change filter without re-opening
>>>>>> twitter stream. Actually, I already had that question earlier at the
>>>>>> stackoverflow
>>>>>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming>
>>>>>> and I got the answer that it's not possible, but it would be even better 
>>>>>> if
>>>>>> there is some other way to add new hashtags or to remove old hashtags 
>>>>>> that
>>>>>> user stopped following. I guess the second request would be more 
>>>>>> difficult.
>>>>>>
>>>>>> However, it would be great if you can give me some short example how
>>>>>> to make this. I didn't understand well from your explanation what you 
>>>>>> mean
>>>>>> by "join it with a rdd loading the newest hash tags from disk in a 
>>>>>> regular
>>>>>> interval".
>>>>>>
>>>>>> Thanks,
>>>>>> Zoran
>>>>>>
>>>>>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Why do you even want to stop it? You can join it with a rdd loading
>>>>>>> the newest hash tags from disk in a regular interval
>>>>>>>
>>>>>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jere...@gmail.com>
>>>>>>> a écrit :
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I have a twitter spark stream initialized in the following way:
>>>>>>>>
>>>>>>>>               val ssc:StreamingContext =
>>>>>>>>> SparkLauncher.getSparkScalaStreamingContext()
>>>>>>>>>               val config = getTwitterConfigurationBuilder.build()
>>>>>>>>>               val auth: Option[twitter4j.auth.Authorization] =
>>>>>>>>> Some(new
>>>>>>>>>
>>>>>>>>> twitter4j.auth.OAuthAuthorization(config))
>>>>>>>>>               val stream = TwitterUtils.createStream(ssc, auth,
>>>>>>>>> filters)
>>>>>>>>>
>>>>>>>>
>>>>>>>> This works fine when I initialy start it. However, at some point I
>>>>>>>> need to update filters since users might add new hashtags they want to
>>>>>>>> follow. I tried to stop the running stream and spark streaming context
>>>>>>>> without stoping spark context, e.g:
>>>>>>>>
>>>>>>>>
>>>>>>>>>                stream.stop()
>>>>>>>>>                ssc.stop(false)
>>>>>>>>>
>>>>>>>>
>>>>>>>> Afterward, I'm trying to initialize a new Twitter stream like I did
>>>>>>>> previously. However, I got this exception:
>>>>>>>>
>>>>>>>> Exception in thread "Firestorm JMX Monitor"
>>>>>>>>> java.lang.IllegalStateException: Adding new inputs, transformations, 
>>>>>>>>> and
>>>>>>>>> output operations after stopping a context is not supported
>>>>>>>>>     at
>>>>>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>>>>>>>>>     at
>>>>>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>>>>>>>     at
>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>>>>>>>>>     at
>>>>>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>>>>>>>>>     at
>>>>>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>>>>>>>>>     at
>>>>>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>>>>>>>>>     at
>>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>>>>>>>>>     at
>>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>>>>>>>>>     at
>>>>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>>>>>>>>>     at
>>>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>>>>>>>>>     at
>>>>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>>>>>>>>>     at java.util.TimerThread.mainLoop(Timer.java:555)
>>>>>>>>>     at java.util.TimerThread.run(Timer.java:505)
>>>>>>>>>  INFO    [2015-07-18 22:24:23,430] [Twitter Stream
>>>>>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl
>>>>>>>>> (SLF4JLogger.java:83)         Inflater has been closed
>>>>>>>>> ERROR    [2015-07-18 22:24:32,503]
>>>>>>>>> [sparkDriver-akka.actor.default-dispatcher-3]
>>>>>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)         
>>>>>>>>> Error
>>>>>>>>> stopping receiver
>>>>>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> Anybody can explain how to solve this issue?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Zoran
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *******************************************************************************
>>>>>> Zoran Jeremic, PhD
>>>>>> Senior System Analyst & Programmer
>>>>>>
>>>>>> Athabasca University
>>>>>> Tel: +1 604 92 89 944
>>>>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>>>>>> Homepage:  http://zoranjeremic.org
>>>>>>
>>>>>> **********************************************************************************
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *******************************************************************************
>>>> Zoran Jeremic, PhD
>>>> Senior System Analyst & Programmer
>>>>
>>>> Athabasca University
>>>> Tel: +1 604 92 89 944
>>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>>>> Homepage:  http://zoranjeremic.org
>>>>
>>>> **********************************************************************************
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> *******************************************************************************
>>> Zoran Jeremic, PhD
>>> Senior System Analyst & Programmer
>>>
>>> Athabasca University
>>> Tel: +1 604 92 89 944
>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>>> Homepage:  http://zoranjeremic.org
>>>
>>> **********************************************************************************
>>>
>>
>>
>
>
> --
>
> *******************************************************************************
> Zoran Jeremic, PhD
> Senior System Analyst & Programmer
>
> Athabasca University
> Tel: +1 604 92 89 944
> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
> Homepage:  http://zoranjeremic.org
>
> **********************************************************************************
>

Reply via email to