Hi Akhil and Jorn,
I tried as you suggested to create some simple scenario, but I have an
error on rdd.join(newRDD): "value join is not a member of
org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like:
val stream = TwitterUtils.createStream(ssc, auth)
> val filteredStream= stream.transform(rdd =>{
> val samplehashtags=Array("music","film")
> val newRDD= samplehashtags.map { x => (x,1) }
> rdd.join(newRDD)
> })
>
Did I miss something here?
Thanks,
Zoran
On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic <[email protected]>
wrote:
> Thanks for explanation.
>
> If I understand this correctly, in this approach I would actually stream
> everything from Twitter, and perform filtering in my application using
> Spark. Isn't this too much overhead if my application is interested in
> listening for couple of hundreds or thousands hashtags?
> On one side, this will be better approach since I will not have the
> problem to open new streams if number of hashtags go over 400 which is the
> Twitter limit for User stream filtering, but on the other side I'm concern
> about how much it will affect application performance if I stream
> everything that is posted on Twitter and filter it locally. It would be
> great if somebody with experience on this could comment on these concerns.
>
> Thanks,
> Zoran
>
> On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das <[email protected]>
> wrote:
>
>> Jorn meant something like this:
>>
>> val filteredStream = twitterStream.transform(rdd =>{
>>
>> val newRDD =
>> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1))
>>
>> rdd.join(newRDD)
>>
>> })
>>
>> newRDD will work like a filter when you do the join.
>>
>>
>> Thanks
>> Best Regards
>>
>> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <[email protected]>
>> wrote:
>>
>>> Hi Jorn,
>>>
>>> I didn't know that it is possible to change filter without re-opening
>>> twitter stream. Actually, I already had that question earlier at the
>>> stackoverflow
>>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming>
>>> and I got the answer that it's not possible, but it would be even better if
>>> there is some other way to add new hashtags or to remove old hashtags that
>>> user stopped following. I guess the second request would be more difficult.
>>>
>>> However, it would be great if you can give me some short example how to
>>> make this. I didn't understand well from your explanation what you mean by
>>> "join it with a rdd loading the newest hash tags from disk in a regular
>>> interval".
>>>
>>> Thanks,
>>> Zoran
>>>
>>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <[email protected]>
>>> wrote:
>>>
>>>> Why do you even want to stop it? You can join it with a rdd loading the
>>>> newest hash tags from disk in a regular interval
>>>>
>>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <[email protected]>
>>>> a écrit :
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a twitter spark stream initialized in the following way:
>>>>>
>>>>> val ssc:StreamingContext =
>>>>>> SparkLauncher.getSparkScalaStreamingContext()
>>>>>> val config = getTwitterConfigurationBuilder.build()
>>>>>> val auth: Option[twitter4j.auth.Authorization] =
>>>>>> Some(new
>>>>>> twitter4j.auth.OAuthAuthorization(config))
>>>>>> val stream = TwitterUtils.createStream(ssc, auth,
>>>>>> filters)
>>>>>>
>>>>>
>>>>> This works fine when I initialy start it. However, at some point I
>>>>> need to update filters since users might add new hashtags they want to
>>>>> follow. I tried to stop the running stream and spark streaming context
>>>>> without stoping spark context, e.g:
>>>>>
>>>>>
>>>>>> stream.stop()
>>>>>> ssc.stop(false)
>>>>>>
>>>>>
>>>>> Afterward, I'm trying to initialize a new Twitter stream like I did
>>>>> previously. However, I got this exception:
>>>>>
>>>>> Exception in thread "Firestorm JMX Monitor"
>>>>>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>>>>>> output operations after stopping a context is not supported
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>>>>>> at
>>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>>>>>> at
>>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>>>>>> at
>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>>>>>> at
>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>>>>>> at
>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>>>>>> at
>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>>>>>> at
>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>>>>>> at java.util.TimerThread.mainLoop(Timer.java:555)
>>>>>> at java.util.TimerThread.run(Timer.java:505)
>>>>>> INFO [2015-07-18 22:24:23,430] [Twitter Stream
>>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl
>>>>>> (SLF4JLogger.java:83) Inflater has been closed
>>>>>> ERROR [2015-07-18 22:24:32,503]
>>>>>> [sparkDriver-akka.actor.default-dispatcher-3]
>>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)
>>>>>> Error
>>>>>> stopping receiver
>>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>>>>>>
>>>>>
>>>>>>
>>>>>
>>>>> Anybody can explain how to solve this issue?
>>>>>
>>>>> Thanks,
>>>>> Zoran
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> *******************************************************************************
>>> Zoran Jeremic, PhD
>>> Senior System Analyst & Programmer
>>>
>>> Athabasca University
>>> Tel: +1 604 92 89 944
>>> E-mail: [email protected] <[email protected]>
>>> Homepage: http://zoranjeremic.org
>>>
>>> **********************************************************************************
>>>
>>
>>
>
>
> --
>
> *******************************************************************************
> Zoran Jeremic, PhD
> Senior System Analyst & Programmer
>
> Athabasca University
> Tel: +1 604 92 89 944
> E-mail: [email protected] <[email protected]>
> Homepage: http://zoranjeremic.org
>
> **********************************************************************************
>
--
*******************************************************************************
Zoran Jeremic, PhD
Senior System Analyst & Programmer
Athabasca University
Tel: +1 604 92 89 944
E-mail: [email protected] <[email protected]>
Homepage: http://zoranjeremic.org
**********************************************************************************