Re: How to restart Twitter spark stream

2015-07-24 Thread Akhil Das
Yes, that is correct, sorry for confusing you. But i guess this is what you
are looking for, let me know if that doesn't help:

val filtered_statuses = stream.transform(rdd ={

  //Instead of hardcoding, you can fetch these from a MySQL or a file
or whatever
  val sampleHashTags =
Array(#teenchoice.toLowerCase,#android.toLowerCase,#iphone.toLowerCase)

  val filtered = rdd.filter(status ={
var found = false
for(tag - sampleHashTags){
  if(status.getText.toLowerCase.contains(tag)) found = true
}
found
  })

  filtered
})

Thanks
Best Regards

On Fri, Jul 24, 2015 at 11:25 AM, Zoran Jeremic zoran.jere...@gmail.com
wrote:

 Hi Akhil,

 Thank you for sending this code. My apologize if I will ask something that
 is obvious here, since I'm newbie in Scala, but I still don't see how I can
 use this code. Maybe my original question was not very clear.

 What I need is to get each Twitter Status that contains one of the
 hashtags I'm following. I'm implementing learning platform, where each
 course will have at least one hashtag, e.g. #cs270computersciencecourse. If
 somebody post anything on Twitter with that hashtag, I want  to get it and
 save Twitter status in the system, so it can be shown in the application.
 Other tweets should be ignored, but each tweet containing one of the
 hashtags I'm following should be stored in the application, so I can't
 process most popular tweets or something like that where it's possible
 that I miss somebody's post. There is a list of hashtags that is followed
 by stream, and this list should be possible to be updated by users.

 If I understood well, code you sent me extracts hashtags from statuses
 received through the stream, and it continue processing these hashtags, but
 at the end I will have only hashtags without statuses. Is that correct, or
 I missed something?

 Thanks,
 Zoran



 On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 That was a pseudo code, working version would look like this:

 val stream = TwitterUtils.createStream(ssc, None)

 val hashTags = stream.flatMap(status = status.getText.split(
 ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1))

 val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
 Seconds(10))
   .map{case (topic, count) = (count, topic)}
   .transform(_.sortByKey(false)).map(x = x._2)

 topCounts10.print()


 val filteredStream = topCounts10.transform(rdd ={
   *val samplehashtags =
 ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))*
   val newRDD = samplehashtags.map { x = (x,1) }
   val joined = newRDD.join(rdd)

   joined
 })

 filteredStream.print()

 Thanks
 Best Regards

 On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi Akhil and Jorn,

 I tried as you suggested to create some simple scenario, but I have an
 error on rdd.join(newRDD):  value join is not a member of
 org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like:

 val stream = TwitterUtils.createStream(ssc, auth)
 val filteredStream= stream.transform(rdd ={
 val samplehashtags=Array(music,film)
 val newRDD= samplehashtags.map { x = (x,1) }
 rdd.join(newRDD)
  })


 Did I miss something here?

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Thanks for explanation.

 If I understand this correctly, in this approach I would actually
 stream everything from Twitter, and perform filtering in my application
 using Spark. Isn't this too much overhead if my application is interested
 in listening for couple of hundreds or thousands hashtags?
 On one side, this will be better approach since I will not have the
 problem to open new streams if number of hashtags go over 400 which is the
 Twitter limit for User stream filtering, but on the other side I'm concern
 about how much it will affect application performance if I stream
 everything that is posted on Twitter and filter it locally. It would be
 great if somebody with experience on this could comment on these concerns.

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Jorn meant something like this:

 val filteredStream = twitterStream.transform(rdd ={

 val newRDD =
 scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1))

 rdd.join(newRDD)

 })

 ​newRDD will work like a filter when you do the join.​


 Thanks
 Best Regards

 On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic 
 zoran.jere...@gmail.com wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
 and I got the answer that it's not possible, but it would be even better 
 if
 there is 

Re: How to restart Twitter spark stream

2015-07-24 Thread Zoran Jeremic
Hi Akhil,

That's exactly what I needed. You saved my day :)

Thanks a lot,

Best,
Zoran

On Fri, Jul 24, 2015 at 12:28 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Yes, that is correct, sorry for confusing you. But i guess this is what
 you are looking for, let me know if that doesn't help:

 val filtered_statuses = stream.transform(rdd ={

   //Instead of hardcoding, you can fetch these from a MySQL or a file
 or whatever
   val sampleHashTags =
 Array(#teenchoice.toLowerCase,#android.toLowerCase,#iphone.toLowerCase)

   val filtered = rdd.filter(status ={
 var found = false
 for(tag - sampleHashTags){
   if(status.getText.toLowerCase.contains(tag)) found = true
 }
 found
   })

   filtered
 })

 Thanks
 Best Regards

 On Fri, Jul 24, 2015 at 11:25 AM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi Akhil,

 Thank you for sending this code. My apologize if I will ask something
 that is obvious here, since I'm newbie in Scala, but I still don't see how
 I can use this code. Maybe my original question was not very clear.

 What I need is to get each Twitter Status that contains one of the
 hashtags I'm following. I'm implementing learning platform, where each
 course will have at least one hashtag, e.g. #cs270computersciencecourse. If
 somebody post anything on Twitter with that hashtag, I want  to get it and
 save Twitter status in the system, so it can be shown in the application.
 Other tweets should be ignored, but each tweet containing one of the
 hashtags I'm following should be stored in the application, so I can't
 process most popular tweets or something like that where it's possible
 that I miss somebody's post. There is a list of hashtags that is followed
 by stream, and this list should be possible to be updated by users.

 If I understood well, code you sent me extracts hashtags from statuses
 received through the stream, and it continue processing these hashtags, but
 at the end I will have only hashtags without statuses. Is that correct, or
 I missed something?

 Thanks,
 Zoran



 On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 That was a pseudo code, working version would look like this:

 val stream = TwitterUtils.createStream(ssc, None)

 val hashTags = stream.flatMap(status = status.getText.split(
 ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1))

 val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
 Seconds(10))
   .map{case (topic, count) = (count, topic)}
   .transform(_.sortByKey(false)).map(x = x._2)

 topCounts10.print()


 val filteredStream = topCounts10.transform(rdd ={
   *val samplehashtags =
 ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))*
   val newRDD = samplehashtags.map { x = (x,1) }
   val joined = newRDD.join(rdd)

   joined
 })

 filteredStream.print()

 Thanks
 Best Regards

 On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi Akhil and Jorn,

 I tried as you suggested to create some simple scenario, but I have an
 error on rdd.join(newRDD):  value join is not a member of
 org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like:

 val stream = TwitterUtils.createStream(ssc, auth)
 val filteredStream= stream.transform(rdd ={
 val samplehashtags=Array(music,film)
 val newRDD= samplehashtags.map { x = (x,1) }
 rdd.join(newRDD)
  })


 Did I miss something here?

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com
  wrote:

 Thanks for explanation.

 If I understand this correctly, in this approach I would actually
 stream everything from Twitter, and perform filtering in my application
 using Spark. Isn't this too much overhead if my application is interested
 in listening for couple of hundreds or thousands hashtags?
 On one side, this will be better approach since I will not have the
 problem to open new streams if number of hashtags go over 400 which is the
 Twitter limit for User stream filtering, but on the other side I'm concern
 about how much it will affect application performance if I stream
 everything that is posted on Twitter and filter it locally. It would be
 great if somebody with experience on this could comment on these concerns.

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Jorn meant something like this:

 val filteredStream = twitterStream.transform(rdd ={

 val newRDD =
 scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1))

 rdd.join(newRDD)

 })

 ​newRDD will work like a filter when you do the join.​


 Thanks
 Best Regards

 On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic 
 zoran.jere...@gmail.com wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without
 re-opening twitter stream. Actually, I already had that question 
 

Re: How to restart Twitter spark stream

2015-07-23 Thread Zoran Jeremic
Hi Akhil,

Thank you for sending this code. My apologize if I will ask something that
is obvious here, since I'm newbie in Scala, but I still don't see how I can
use this code. Maybe my original question was not very clear.

What I need is to get each Twitter Status that contains one of the hashtags
I'm following. I'm implementing learning platform, where each course will
have at least one hashtag, e.g. #cs270computersciencecourse. If somebody
post anything on Twitter with that hashtag, I want  to get it and save
Twitter status in the system, so it can be shown in the application. Other
tweets should be ignored, but each tweet containing one of the hashtags I'm
following should be stored in the application, so I can't process most
popular tweets or something like that where it's possible that I miss
somebody's post. There is a list of hashtags that is followed by stream,
and this list should be possible to be updated by users.

If I understood well, code you sent me extracts hashtags from statuses
received through the stream, and it continue processing these hashtags, but
at the end I will have only hashtags without statuses. Is that correct, or
I missed something?

Thanks,
Zoran



On Wed, Jul 22, 2015 at 12:41 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 That was a pseudo code, working version would look like this:

 val stream = TwitterUtils.createStream(ssc, None)

 val hashTags = stream.flatMap(status = status.getText.split(
 ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1))

 val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
 Seconds(10))
   .map{case (topic, count) = (count, topic)}
   .transform(_.sortByKey(false)).map(x = x._2)

 topCounts10.print()


 val filteredStream = topCounts10.transform(rdd ={
   *val samplehashtags =
 ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))*
   val newRDD = samplehashtags.map { x = (x,1) }
   val joined = newRDD.join(rdd)

   joined
 })

 filteredStream.print()

 Thanks
 Best Regards

 On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi Akhil and Jorn,

 I tried as you suggested to create some simple scenario, but I have an
 error on rdd.join(newRDD):  value join is not a member of
 org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like:

 val stream = TwitterUtils.createStream(ssc, auth)
 val filteredStream= stream.transform(rdd ={
 val samplehashtags=Array(music,film)
 val newRDD= samplehashtags.map { x = (x,1) }
 rdd.join(newRDD)
  })


 Did I miss something here?

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Thanks for explanation.

 If I understand this correctly, in this approach I would actually stream
 everything from Twitter, and perform filtering in my application using
 Spark. Isn't this too much overhead if my application is interested in
 listening for couple of hundreds or thousands hashtags?
 On one side, this will be better approach since I will not have the
 problem to open new streams if number of hashtags go over 400 which is the
 Twitter limit for User stream filtering, but on the other side I'm concern
 about how much it will affect application performance if I stream
 everything that is posted on Twitter and filter it locally. It would be
 great if somebody with experience on this could comment on these concerns.

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Jorn meant something like this:

 val filteredStream = twitterStream.transform(rdd ={

 val newRDD =
 scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1))

 rdd.join(newRDD)

 })

 ​newRDD will work like a filter when you do the join.​


 Thanks
 Best Regards

 On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com
  wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
 and I got the answer that it's not possible, but it would be even better 
 if
 there is some other way to add new hashtags or to remove old hashtags that
 user stopped following. I guess the second request would be more 
 difficult.

 However, it would be great if you can give me some short example how
 to make this. I didn't understand well from your explanation what you mean
 by join it with a rdd loading the newest hash tags from disk in a regular
 interval.

 Thanks,
 Zoran

 On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com
 wrote:

 Why do you even want to stop it? You can join it with a rdd loading
 the newest hash tags from disk in a regular interval

 Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com
 a écrit :

 Hi,

 I have a twitter spark stream 

Re: How to restart Twitter spark stream

2015-07-22 Thread Akhil Das
That was a pseudo code, working version would look like this:

val stream = TwitterUtils.createStream(ssc, None)

val hashTags = stream.flatMap(status = status.getText.split(
).filter(_.startsWith(#))).map(x = (x.toLowerCase,1))

val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
Seconds(10))
  .map{case (topic, count) = (count, topic)}
  .transform(_.sortByKey(false)).map(x = x._2)

topCounts10.print()


val filteredStream = topCounts10.transform(rdd ={
  *val samplehashtags =
ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))*
  val newRDD = samplehashtags.map { x = (x,1) }
  val joined = newRDD.join(rdd)

  joined
})

filteredStream.print()

Thanks
Best Regards

On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com
wrote:

 Hi Akhil and Jorn,

 I tried as you suggested to create some simple scenario, but I have an
 error on rdd.join(newRDD):  value join is not a member of
 org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like:

 val stream = TwitterUtils.createStream(ssc, auth)
 val filteredStream= stream.transform(rdd ={
 val samplehashtags=Array(music,film)
 val newRDD= samplehashtags.map { x = (x,1) }
 rdd.join(newRDD)
  })


 Did I miss something here?

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Thanks for explanation.

 If I understand this correctly, in this approach I would actually stream
 everything from Twitter, and perform filtering in my application using
 Spark. Isn't this too much overhead if my application is interested in
 listening for couple of hundreds or thousands hashtags?
 On one side, this will be better approach since I will not have the
 problem to open new streams if number of hashtags go over 400 which is the
 Twitter limit for User stream filtering, but on the other side I'm concern
 about how much it will affect application performance if I stream
 everything that is posted on Twitter and filter it locally. It would be
 great if somebody with experience on this could comment on these concerns.

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Jorn meant something like this:

 val filteredStream = twitterStream.transform(rdd ={

 val newRDD =
 scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1))

 rdd.join(newRDD)

 })

 ​newRDD will work like a filter when you do the join.​


 Thanks
 Best Regards

 On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
 and I got the answer that it's not possible, but it would be even better if
 there is some other way to add new hashtags or to remove old hashtags that
 user stopped following. I guess the second request would be more difficult.

 However, it would be great if you can give me some short example how to
 make this. I didn't understand well from your explanation what you mean by
 join it with a rdd loading the newest hash tags from disk in a regular
 interval.

 Thanks,
 Zoran

 On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com
 wrote:

 Why do you even want to stop it? You can join it with a rdd loading
 the newest hash tags from disk in a regular interval

 Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com
 a écrit :

 Hi,

 I have a twitter spark stream initialized in the following way:

   val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new

 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth,
 filters)


 This works fine when I initialy start it. However, at some point I
 need to update filters since users might add new hashtags they want to
 follow. I tried to stop the running stream and spark streaming context
 without stoping spark context, e.g:


stream.stop()
ssc.stop(false)


 Afterward, I'm trying to initialize a new Twitter stream like I did
 previously. However, I got this exception:

 Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at
 org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 

Re: How to restart Twitter spark stream

2015-07-21 Thread Zoran Jeremic
Hi Akhil and Jorn,

I tried as you suggested to create some simple scenario, but I have an
error on rdd.join(newRDD):  value join is not a member of
org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like:

val stream = TwitterUtils.createStream(ssc, auth)
 val filteredStream= stream.transform(rdd ={
 val samplehashtags=Array(music,film)
 val newRDD= samplehashtags.map { x = (x,1) }
 rdd.join(newRDD)
  })


Did I miss something here?

Thanks,
Zoran

On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com
wrote:

 Thanks for explanation.

 If I understand this correctly, in this approach I would actually stream
 everything from Twitter, and perform filtering in my application using
 Spark. Isn't this too much overhead if my application is interested in
 listening for couple of hundreds or thousands hashtags?
 On one side, this will be better approach since I will not have the
 problem to open new streams if number of hashtags go over 400 which is the
 Twitter limit for User stream filtering, but on the other side I'm concern
 about how much it will affect application performance if I stream
 everything that is posted on Twitter and filter it locally. It would be
 great if somebody with experience on this could comment on these concerns.

 Thanks,
 Zoran

 On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Jorn meant something like this:

 val filteredStream = twitterStream.transform(rdd ={

 val newRDD =
 scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1))

 rdd.join(newRDD)

 })

 ​newRDD will work like a filter when you do the join.​


 Thanks
 Best Regards

 On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
 and I got the answer that it's not possible, but it would be even better if
 there is some other way to add new hashtags or to remove old hashtags that
 user stopped following. I guess the second request would be more difficult.

 However, it would be great if you can give me some short example how to
 make this. I didn't understand well from your explanation what you mean by
 join it with a rdd loading the newest hash tags from disk in a regular
 interval.

 Thanks,
 Zoran

 On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com
 wrote:

 Why do you even want to stop it? You can join it with a rdd loading the
 newest hash tags from disk in a regular interval

 Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com
 a écrit :

 Hi,

 I have a twitter spark stream initialized in the following way:

   val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new
 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth,
 filters)


 This works fine when I initialy start it. However, at some point I
 need to update filters since users might add new hashtags they want to
 follow. I tried to stop the running stream and spark streaming context
 without stoping spark context, e.g:


stream.stop()
ssc.stop(false)


 Afterward, I'm trying to initialize a new Twitter stream like I did
 previously. However, I got this exception:

 Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at
 org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41)
 at
 org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46)
 at
 org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
 at
 

Re: How to restart Twitter spark stream

2015-07-20 Thread Akhil Das
Jorn meant something like this:

val filteredStream = twitterStream.transform(rdd ={

val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x
= (x,1))

rdd.join(newRDD)

})

​newRDD will work like a filter when you do the join.​


Thanks
Best Regards

On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com
wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
 and I got the answer that it's not possible, but it would be even better if
 there is some other way to add new hashtags or to remove old hashtags that
 user stopped following. I guess the second request would be more difficult.

 However, it would be great if you can give me some short example how to
 make this. I didn't understand well from your explanation what you mean by
 join it with a rdd loading the newest hash tags from disk in a regular
 interval.

 Thanks,
 Zoran

 On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote:

 Why do you even want to stop it? You can join it with a rdd loading the
 newest hash tags from disk in a regular interval

 Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a
 écrit :

 Hi,

 I have a twitter spark stream initialized in the following way:

   val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new
 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth,
 filters)


 This works fine when I initialy start it. However, at some point I need
 to update filters since users might add new hashtags they want to follow. I
 tried to stop the running stream and spark streaming context without
 stoping spark context, e.g:


stream.stop()
ssc.stop(false)


 Afterward, I'm trying to initialize a new Twitter stream like I did
 previously. However, I got this exception:

 Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at
 org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41)
 at
 org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46)
 at
 org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
  INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
 thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater
 has been closed
 ERROR[2015-07-18 22:24:32,503]
 [sparkDriver-akka.actor.default-dispatcher-3]
 streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
 stopping receiver
 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)




 Anybody can explain how to solve this issue?

 Thanks,
 Zoran




 --

 ***
 Zoran Jeremic, PhD
 Senior System Analyst  Programmer

 Athabasca University
 Tel: +1 604 92 89 944
 E-mail: zoran.jere...@gmail.com zoran.jere...@va.mod.gov.rs
 Homepage:  http://zoranjeremic.org

 **



Re: How to restart Twitter spark stream

2015-07-20 Thread Zoran Jeremic
Thanks for explanation.

If I understand this correctly, in this approach I would actually stream
everything from Twitter, and perform filtering in my application using
Spark. Isn't this too much overhead if my application is interested in
listening for couple of hundreds or thousands hashtags?
On one side, this will be better approach since I will not have the problem
to open new streams if number of hashtags go over 400 which is the Twitter
limit for User stream filtering, but on the other side I'm concern about
how much it will affect application performance if I stream everything that
is posted on Twitter and filter it locally. It would be great if somebody
with experience on this could comment on these concerns.

Thanks,
Zoran

On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Jorn meant something like this:

 val filteredStream = twitterStream.transform(rdd ={

 val newRDD =
 scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1))

 rdd.join(newRDD)

 })

 ​newRDD will work like a filter when you do the join.​


 Thanks
 Best Regards

 On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
 and I got the answer that it's not possible, but it would be even better if
 there is some other way to add new hashtags or to remove old hashtags that
 user stopped following. I guess the second request would be more difficult.

 However, it would be great if you can give me some short example how to
 make this. I didn't understand well from your explanation what you mean by
 join it with a rdd loading the newest hash tags from disk in a regular
 interval.

 Thanks,
 Zoran

 On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com
 wrote:

 Why do you even want to stop it? You can join it with a rdd loading the
 newest hash tags from disk in a regular interval

 Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a
 écrit :

 Hi,

 I have a twitter spark stream initialized in the following way:

   val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new
 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth,
 filters)


 This works fine when I initialy start it. However, at some point I need
 to update filters since users might add new hashtags they want to follow. I
 tried to stop the running stream and spark streaming context without
 stoping spark context, e.g:


stream.stop()
ssc.stop(false)


 Afterward, I'm trying to initialize a new Twitter stream like I did
 previously. However, I got this exception:

 Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at
 org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41)
 at
 org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46)
 at
 org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
  INFO[2015-07-18 22:24:23,430] [Twitter Stream
 consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl
 (SLF4JLogger.java:83) Inflater has been closed
 ERROR[2015-07-18 22:24:32,503]
 [sparkDriver-akka.actor.default-dispatcher-3]
 streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
 stopping receiver
 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)




 Anybody can explain how to solve 

Re: How to restart Twitter spark stream

2015-07-19 Thread Jörn Franke
Why do you even want to stop it? You can join it with a rdd loading the
newest hash tags from disk in a regular interval

Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a
écrit :

 Hi,

 I have a twitter spark stream initialized in the following way:

   val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new
 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth, filters)


 This works fine when I initialy start it. However, at some point I need to
 update filters since users might add new hashtags they want to follow. I
 tried to stop the running stream and spark streaming context without
 stoping spark context, e.g:


stream.stop()
ssc.stop(false)


 Afterward, I'm trying to initialize a new Twitter stream like I did
 previously. However, I got this exception:

 Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41)
 at
 org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46)
 at
 org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
  INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
 thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater
 has been closed
 ERROR[2015-07-18 22:24:32,503]
 [sparkDriver-akka.actor.default-dispatcher-3]
 streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
 stopping receiver
 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)




 Anybody can explain how to solve this issue?

 Thanks,
 Zoran



Re: How to restart Twitter spark stream

2015-07-19 Thread Zoran Jeremic
Hi Jorn,

I didn't know that it is possible to change filter without re-opening
twitter stream. Actually, I already had that question earlier at the
stackoverflow
http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
and I got the answer that it's not possible, but it would be even better if
there is some other way to add new hashtags or to remove old hashtags that
user stopped following. I guess the second request would be more difficult.

However, it would be great if you can give me some short example how to
make this. I didn't understand well from your explanation what you mean by
join it with a rdd loading the newest hash tags from disk in a regular
interval.

Thanks,
Zoran

On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote:

 Why do you even want to stop it? You can join it with a rdd loading the
 newest hash tags from disk in a regular interval

 Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a
 écrit :

 Hi,

 I have a twitter spark stream initialized in the following way:

   val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new
 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth,
 filters)


 This works fine when I initialy start it. However, at some point I need
 to update filters since users might add new hashtags they want to follow. I
 tried to stop the running stream and spark streaming context without
 stoping spark context, e.g:


stream.stop()
ssc.stop(false)


 Afterward, I'm trying to initialize a new Twitter stream like I did
 previously. However, I got this exception:

 Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at
 org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41)
 at
 org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46)
 at
 org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
  INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
 thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater
 has been closed
 ERROR[2015-07-18 22:24:32,503]
 [sparkDriver-akka.actor.default-dispatcher-3]
 streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
 stopping receiver
 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)




 Anybody can explain how to solve this issue?

 Thanks,
 Zoran




-- 
***
Zoran Jeremic, PhD
Senior System Analyst  Programmer

Athabasca University
Tel: +1 604 92 89 944
E-mail: zoran.jere...@gmail.com zoran.jere...@va.mod.gov.rs
Homepage:  http://zoranjeremic.org
**


How to restart Twitter spark stream

2015-07-18 Thread Zoran Jeremic
Hi,

I have a twitter spark stream initialized in the following way:

  val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new
 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth, filters)


This works fine when I initialy start it. However, at some point I need to
update filters since users might add new hashtags they want to follow. I
tried to stop the running stream and spark streaming context without
stoping spark context, e.g:


stream.stop()
ssc.stop(false)


Afterward, I'm trying to initialize a new Twitter stream like I did
previously. However, I got this exception:

Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41)
 at
 org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46)
 at
 org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
  INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
 thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater
 has been closed
 ERROR[2015-07-18 22:24:32,503]
 [sparkDriver-akka.actor.default-dispatcher-3]
 streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
 stopping receiver
 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)




Anybody can explain how to solve this issue?

Thanks,
Zoran