Re: How to restart Twitter spark stream
Yes, that is correct, sorry for confusing you. But i guess this is what you are looking for, let me know if that doesn't help: val filtered_statuses = stream.transform(rdd ={ //Instead of hardcoding, you can fetch these from a MySQL or a file or whatever val sampleHashTags = Array(#teenchoice.toLowerCase,#android.toLowerCase,#iphone.toLowerCase) val filtered = rdd.filter(status ={ var found = false for(tag - sampleHashTags){ if(status.getText.toLowerCase.contains(tag)) found = true } found }) filtered }) Thanks Best Regards On Fri, Jul 24, 2015 at 11:25 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil, Thank you for sending this code. My apologize if I will ask something that is obvious here, since I'm newbie in Scala, but I still don't see how I can use this code. Maybe my original question was not very clear. What I need is to get each Twitter Status that contains one of the hashtags I'm following. I'm implementing learning platform, where each course will have at least one hashtag, e.g. #cs270computersciencecourse. If somebody post anything on Twitter with that hashtag, I want to get it and save Twitter status in the system, so it can be shown in the application. Other tweets should be ignored, but each tweet containing one of the hashtags I'm following should be stored in the application, so I can't process most popular tweets or something like that where it's possible that I miss somebody's post. There is a list of hashtags that is followed by stream, and this list should be possible to be updated by users. If I understood well, code you sent me extracts hashtags from statuses received through the stream, and it continue processing these hashtags, but at the end I will have only hashtags without statuses. Is that correct, or I missed something? Thanks, Zoran On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das ak...@sigmoidanalytics.com wrote: That was a pseudo code, working version would look like this: val stream = TwitterUtils.createStream(ssc, None) val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1)) val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map{case (topic, count) = (count, topic)} .transform(_.sortByKey(false)).map(x = x._2) topCounts10.print() val filteredStream = topCounts10.transform(rdd ={ *val samplehashtags = ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))* val newRDD = samplehashtags.map { x = (x,1) } val joined = newRDD.join(rdd) joined }) filteredStream.print() Thanks Best Regards On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is
Re: How to restart Twitter spark stream
Hi Akhil, That's exactly what I needed. You saved my day :) Thanks a lot, Best, Zoran On Fri, Jul 24, 2015 at 12:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Yes, that is correct, sorry for confusing you. But i guess this is what you are looking for, let me know if that doesn't help: val filtered_statuses = stream.transform(rdd ={ //Instead of hardcoding, you can fetch these from a MySQL or a file or whatever val sampleHashTags = Array(#teenchoice.toLowerCase,#android.toLowerCase,#iphone.toLowerCase) val filtered = rdd.filter(status ={ var found = false for(tag - sampleHashTags){ if(status.getText.toLowerCase.contains(tag)) found = true } found }) filtered }) Thanks Best Regards On Fri, Jul 24, 2015 at 11:25 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil, Thank you for sending this code. My apologize if I will ask something that is obvious here, since I'm newbie in Scala, but I still don't see how I can use this code. Maybe my original question was not very clear. What I need is to get each Twitter Status that contains one of the hashtags I'm following. I'm implementing learning platform, where each course will have at least one hashtag, e.g. #cs270computersciencecourse. If somebody post anything on Twitter with that hashtag, I want to get it and save Twitter status in the system, so it can be shown in the application. Other tweets should be ignored, but each tweet containing one of the hashtags I'm following should be stored in the application, so I can't process most popular tweets or something like that where it's possible that I miss somebody's post. There is a list of hashtags that is followed by stream, and this list should be possible to be updated by users. If I understood well, code you sent me extracts hashtags from statuses received through the stream, and it continue processing these hashtags, but at the end I will have only hashtags without statuses. Is that correct, or I missed something? Thanks, Zoran On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das ak...@sigmoidanalytics.com wrote: That was a pseudo code, working version would look like this: val stream = TwitterUtils.createStream(ssc, None) val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1)) val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map{case (topic, count) = (count, topic)} .transform(_.sortByKey(false)).map(x = x._2) topCounts10.print() val filteredStream = topCounts10.transform(rdd ={ *val samplehashtags = ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))* val newRDD = samplehashtags.map { x = (x,1) } val joined = newRDD.join(rdd) joined }) filteredStream.print() Thanks Best Regards On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question
Re: How to restart Twitter spark stream
Hi Akhil, Thank you for sending this code. My apologize if I will ask something that is obvious here, since I'm newbie in Scala, but I still don't see how I can use this code. Maybe my original question was not very clear. What I need is to get each Twitter Status that contains one of the hashtags I'm following. I'm implementing learning platform, where each course will have at least one hashtag, e.g. #cs270computersciencecourse. If somebody post anything on Twitter with that hashtag, I want to get it and save Twitter status in the system, so it can be shown in the application. Other tweets should be ignored, but each tweet containing one of the hashtags I'm following should be stored in the application, so I can't process most popular tweets or something like that where it's possible that I miss somebody's post. There is a list of hashtags that is followed by stream, and this list should be possible to be updated by users. If I understood well, code you sent me extracts hashtags from statuses received through the stream, and it continue processing these hashtags, but at the end I will have only hashtags without statuses. Is that correct, or I missed something? Thanks, Zoran On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das ak...@sigmoidanalytics.com wrote: That was a pseudo code, working version would look like this: val stream = TwitterUtils.createStream(ssc, None) val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1)) val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map{case (topic, count) = (count, topic)} .transform(_.sortByKey(false)).map(x = x._2) topCounts10.print() val filteredStream = topCounts10.transform(rdd ={ *val samplehashtags = ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))* val newRDD = samplehashtags.map { x = (x,1) } val joined = newRDD.join(rdd) joined }) filteredStream.print() Thanks Best Regards On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream
Re: How to restart Twitter spark stream
That was a pseudo code, working version would look like this: val stream = TwitterUtils.createStream(ssc, None) val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1)) val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map{case (topic, count) = (count, topic)} .transform(_.sortByKey(false)).map(x = x._2) topCounts10.print() val filteredStream = topCounts10.transform(rdd ={ *val samplehashtags = ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))* val newRDD = samplehashtags.map { x = (x,1) } val joined = newRDD.join(rdd) joined }) filteredStream.print() Thanks Best Regards On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at
Re: How to restart Twitter spark stream
Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at
Re: How to restart Twitter spark stream
Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve this issue? Thanks, Zoran -- *** Zoran Jeremic, PhD Senior System Analyst Programmer Athabasca University Tel: +1 604 92 89 944 E-mail: zoran.jere...@gmail.com zoran.jere...@va.mod.gov.rs Homepage: http://zoranjeremic.org **
Re: How to restart Twitter spark stream
Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve
Re: How to restart Twitter spark stream
Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve this issue? Thanks, Zoran
Re: How to restart Twitter spark stream
Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve this issue? Thanks, Zoran -- *** Zoran Jeremic, PhD Senior System Analyst Programmer Athabasca University Tel: +1 604 92 89 944 E-mail: zoran.jere...@gmail.com zoran.jere...@va.mod.gov.rs Homepage: http://zoranjeremic.org **
How to restart Twitter spark stream
Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve this issue? Thanks, Zoran