Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): "value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like:
val stream = TwitterUtils.createStream(ssc, auth) > val filteredStream= stream.transform(rdd =>{ > val samplehashtags=Array("music","film") > val newRDD= samplehashtags.map { x => (x,1) } > rdd.join(newRDD) > }) > Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic <zoran.jere...@gmail.com> wrote: > Thanks for explanation. > > If I understand this correctly, in this approach I would actually stream > everything from Twitter, and perform filtering in my application using > Spark. Isn't this too much overhead if my application is interested in > listening for couple of hundreds or thousands hashtags? > On one side, this will be better approach since I will not have the > problem to open new streams if number of hashtags go over 400 which is the > Twitter limit for User stream filtering, but on the other side I'm concern > about how much it will affect application performance if I stream > everything that is posted on Twitter and filter it locally. It would be > great if somebody with experience on this could comment on these concerns. > > Thanks, > Zoran > > On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Jorn meant something like this: >> >> val filteredStream = twitterStream.transform(rdd =>{ >> >> val newRDD = >> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1)) >> >> rdd.join(newRDD) >> >> }) >> >> newRDD will work like a filter when you do the join. >> >> >> Thanks >> Best Regards >> >> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <zoran.jere...@gmail.com> >> wrote: >> >>> Hi Jorn, >>> >>> I didn't know that it is possible to change filter without re-opening >>> twitter stream. Actually, I already had that question earlier at the >>> stackoverflow >>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming> >>> and I got the answer that it's not possible, but it would be even better if >>> there is some other way to add new hashtags or to remove old hashtags that >>> user stopped following. I guess the second request would be more difficult. >>> >>> However, it would be great if you can give me some short example how to >>> make this. I didn't understand well from your explanation what you mean by >>> "join it with a rdd loading the newest hash tags from disk in a regular >>> interval". >>> >>> Thanks, >>> Zoran >>> >>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com> >>> wrote: >>> >>>> Why do you even want to stop it? You can join it with a rdd loading the >>>> newest hash tags from disk in a regular interval >>>> >>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jere...@gmail.com> >>>> a écrit : >>>> >>>>> Hi, >>>>> >>>>> I have a twitter spark stream initialized in the following way: >>>>> >>>>> val ssc:StreamingContext = >>>>>> SparkLauncher.getSparkScalaStreamingContext() >>>>>> val config = getTwitterConfigurationBuilder.build() >>>>>> val auth: Option[twitter4j.auth.Authorization] = >>>>>> Some(new >>>>>> twitter4j.auth.OAuthAuthorization(config)) >>>>>> val stream = TwitterUtils.createStream(ssc, auth, >>>>>> filters) >>>>>> >>>>> >>>>> This works fine when I initialy start it. However, at some point I >>>>> need to update filters since users might add new hashtags they want to >>>>> follow. I tried to stop the running stream and spark streaming context >>>>> without stoping spark context, e.g: >>>>> >>>>> >>>>>> stream.stop() >>>>>> ssc.stop(false) >>>>>> >>>>> >>>>> Afterward, I'm trying to initialize a new Twitter stream like I did >>>>> previously. However, I got this exception: >>>>> >>>>> Exception in thread "Firestorm JMX Monitor" >>>>>> java.lang.IllegalStateException: Adding new inputs, transformations, and >>>>>> output operations after stopping a context is not supported >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41) >>>>>> at >>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46) >>>>>> at >>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) >>>>>> at >>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) >>>>>> at >>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) >>>>>> at >>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) >>>>>> at >>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) >>>>>> at >>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) >>>>>> at java.util.TimerThread.mainLoop(Timer.java:555) >>>>>> at java.util.TimerThread.run(Timer.java:505) >>>>>> INFO [2015-07-18 22:24:23,430] [Twitter Stream >>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl >>>>>> (SLF4JLogger.java:83) Inflater has been closed >>>>>> ERROR [2015-07-18 22:24:32,503] >>>>>> [sparkDriver-akka.actor.default-dispatcher-3] >>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) >>>>>> Error >>>>>> stopping receiver >>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) >>>>>> >>>>> >>>>>> >>>>> >>>>> Anybody can explain how to solve this issue? >>>>> >>>>> Thanks, >>>>> Zoran >>>>> >>>> >>> >>> >>> -- >>> >>> ******************************************************************************* >>> Zoran Jeremic, PhD >>> Senior System Analyst & Programmer >>> >>> Athabasca University >>> Tel: +1 604 92 89 944 >>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> >>> Homepage: http://zoranjeremic.org >>> >>> ********************************************************************************** >>> >> >> > > > -- > > ******************************************************************************* > Zoran Jeremic, PhD > Senior System Analyst & Programmer > > Athabasca University > Tel: +1 604 92 89 944 > E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> > Homepage: http://zoranjeremic.org > > ********************************************************************************** > -- ******************************************************************************* Zoran Jeremic, PhD Senior System Analyst & Programmer Athabasca University Tel: +1 604 92 89 944 E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> Homepage: http://zoranjeremic.org **********************************************************************************