Hi, I am using 1.1.0. I did set my twitter credentials and I am using the full path. I did not paste this in the public post. I am running on a cluster and getting the exception. Are you running in local or standalone mode?
Thanks On Oct 15, 2014 3:20 AM, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: > I just ran the same code and it is running perfectly fine on my machine. > These are the things on my end: > > - Spark version: 1.1.0 > - Gave full path to the negative and positive files > - Set twitter auth credentials in the environment. > > And here's the code: > > import org.apache.spark.SparkContext >> import org.apache.spark.SparkContext._ >> import org.apache.spark.SparkConf >> import org.apache.spark.streaming.twitter.TwitterUtils >> import org.apache.spark.streaming.{Seconds, StreamingContext} >> object Sentimenter { >> def main(args: Array[String]) { >> System.setProperty("twitter4j.oauth.consumerKey","XXXXXXXXXXXXXXXXX"); >> >> System.setProperty("twitter4j.oauth.consumerSecret","XXXXXXXXXXXXXXXXXXXXXXXXX"); >> >> System.setProperty("twitter4j.oauth.accessToken","XXXXXXXXXXXXXXXXXXXXXXXXXXXX"); >> >> System.setProperty("twitter4j.oauth.accessTokenSecret","XXXXXXXXXXXXXXXXXXXXXXX"); >> >> val filters = new Array[String](2) >> filters(0) = "ebola" >> filters(1) = "isis" >> val sparkConf = new >> SparkConf().setAppName("TweetSentiment").setMaster("local[2]") >> val sc = new SparkContext(sparkConf) >> // get the list of positive words >> val pos_list = sc.textFile("file:///home/akhld/positive-words.txt") >> //Random >> .filter(line => !line.isEmpty()) >> .collect() >> .toSet >> // get the list of negative words >> val neg_list = sc.textFile("file:///home/akhld/negative-words.txt") >> //Random >> .filter(line => !line.isEmpty()) >> .collect() >> .toSet >> // create twitter stream >> val ssc = new StreamingContext(sparkConf, Seconds(5)) >> val stream = TwitterUtils.createStream(ssc, None, filters) >> val tweets = stream.map(r => r.getText) >> tweets.print() // print tweet text >> ssc.start() >> ssc.awaitTermination() >> } >> } > > > > > > Thanks > Best Regards > > On Wed, Oct 15, 2014 at 1:43 AM, SK <skrishna...@gmail.com> wrote: > >> Hi, >> >> I am trying to implement simple sentiment analysis of Twitter streams in >> Spark/Scala. I am getting an exception and it appears when I combine >> SparkContext with StreamingContext in the same program. When I read the >> positive and negative words using only SparkContext.textFile (without >> creating a StreamingContext) and analyze static text files, the program >> works. Likewise, when I just create the twitter stream using >> StreamingContext (and dont create a SparkContext to create the >> vocabulary), >> the program works. The exception seems to be appearing when I combine both >> SparkContext and StreamingContext in the same program and I am not sure if >> we are not allowed to have both simultaneously. All the examples in the >> streaming module contain only the StreamingContext. The error transcript >> and >> my code appear below. I would appreciate your guidance in fixing this >> error >> and the right way to read static files and streams in the same program or >> any pointers to relevant examples. >> Thanks. >> >> >> ----------------------Error transcript ----------------------------- >> Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net): >> java.io.IOException: unexpected exception type >> >> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) >> >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) >> >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> >> >> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) >> >> >> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) >> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> java.lang.Thread.run(Thread.java:745) >> ------------------------------ My code below >> ------------------------------ >> object TweetSentiment { >> def main(args: Array[String]) { >> >> >> val filters = args >> val sparkConf = new SparkConf().setAppName("TweetSentiment") >> val sc = new SparkContext(sparkConf) >> >> // get the list of positive words >> val pos_list = sc.textFile("positive-words.txt") >> .filter(line => !line.isEmpty()) >> .collect() >> .toSet >> >> // get the list of negative words >> val neg_list = sc.textFile("negative-words.txt") >> .filter(line => !line.isEmpty()) >> .collect() >> .toSet >> >> // create twitter stream >> val ssc = new StreamingContext(sparkConf, Seconds(60)) >> val stream = TwitterUtils.createStream(ssc, None, filters) >> val tweets = stream.map(r => r.getText) >> tweets.print() // print tweet text >> >> ssc.start() >> ssc.awaitTermination() >> sc.stop() // I tried commenting this, but the exception still >> appeared. >> } >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >