I am new to Apache Spark, now i am trying my first project "Space Saving
Counting Algorithm" and while it compiles in single core using
.setMaster("local") it fails when using .setMaster("local[4]") or any
number>1.My Code
follows:=============================================================import
org.apache.spark.{SparkConf, SparkContext}/** * Created by mixtou on
30/12/14. */object SpaceSaving {  var frequent_words_counters =
scala.collection.immutable.Map[String, Array[Int]]();  var guaranteed_words
= scala.collection.immutable.Map[String, Array[Int]]();  val top_k: Int =
100;  var words_no: Int = 0;  var tStart: Long = 0;  var tStop: Long = 0; 
var fi: Double = 0.001;  def main (args: Array[String]): Unit = {    val
sparkConf = new SparkConf().setAppName("Space Saving
Project").setMaster("local");    val ctx = new SparkContext(sparkConf);   
//    val stopwords =
ctx.textFile("/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/stopwords.txt");
   
val lines =
ctx.textFile("/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/README.txt",
2)      .map(line => line.toLowerCase());    val nonEmptyLines =
lines.filter(line => line.nonEmpty);    val regex =
"[,.:;'\"\\?\\-!\\(\\)\\+\\[\\]\\d+]".r;    //    val temp =
nonEmptyLines.map(removeStopWords(_, stopwords));    val cleanLines =
nonEmptyLines.map(line => regex.replaceAllIn(line, " "));    val dirtyWords
= cleanLines.flatMap(line => line.split("\\s+"));    val words =
dirtyWords.filter(word => word.length > 3);   / words.foreach(word =>
space_saving_algorithm(word));/ *ERROR HERE!*!    if
(frequent_words_counters.size > 0) {     
frequent_words_counters.foreach(line => println("Top Frequent Word: " +
line._1 + " with count: " + line._2(0) + " end error: " + line._2(1)));    }   
System.out.println("=========== Throughput:=> "+ 1000*(words_no/(tStop -
tStart))+ " words per second. " );    estimateGuaranteedFrequentWords();   
words.collect();    ctx.stop();  }  def space_saving_algorithm(word:
String): Unit = {//    System.out.println(word);    if
(frequent_words_counters.contains(word)) { /     val count =
frequent_words_counters.get(word).get(0);/ *ERROR HERE*      val error =
frequent_words_counters.get(word).get(1);//      System.out.println("Before:
" + word + " <=> " + count);      frequent_words_counters += word ->
Array[Int](count+1, error);//      System.out.println("After: " + word + "
<=> " + counters.get(word).get(0));    }    else {      if
(frequent_words_counters.size < top_k) {        frequent_words_counters +=
word -> Array[Int](1, 0);//        System.out.println("Adding Word to
Counters: "+word);      }      else {        replaceLeastEntry(word);      }   
}    if(words_no > 0 ){      tStop = java.lang.System.currentTimeMillis();   
}    else{      tStart = java.lang.System.currentTimeMillis();    }   
words_no += 1;  }  def replaceLeastEntry(word: String): Unit = {    var
temp_list = frequent_words_counters.toList.sortWith( (x,y) => x._2(0) >
y._2(0) );    val word_count = temp_list.last._2(0);//   
System.out.println("Replacing word: " + temp_list.last._1 + ", having count
and error " + temp_list.last._2(0)+" , " + temp_list.last._2(1) + " with
word: "+word);//    System.out.println(temp_list.length);    temp_list =
temp_list.take(temp_list.length - 1);    frequent_words_counters =
temp_list.toMap[String, Array[Int]];    frequent_words_counters += word ->
Array[Int](word_count+1, word_count);  }  def
estimateGuaranteedFrequentWords(): Unit = {   
frequent_words_counters.foreach{tuple =>      if (tuple._2(0) - tuple._2(1)
< words_no*fi) {//        counters -= tuple._1;        guaranteed_words +=
tuple;//        System.out.println("NEW SISZEZEEEE: "+counters.size);      }    
 
else {        System.out.println("Guaranteed Word : "+tuple._1+" with count:
"+tuple._2(0)+" and error: "+tuple._2(1));      }    };  }}The Compiler
Error is:======================================15/01/08 16:44:51 ERROR
Executor: Exception in task 1.0 in stage 0.0 (TID
1)java.util.NoSuchElementException: None.get    at
scala.None$.get(Option.scala:313)       at scala.None$.get(Option.scala:311)    
at
SpaceSaving$.space_saving_algorithm(SpaceSaving.scala:52)       at
SpaceSaving$$anonfun$main$1.apply(SpaceSaving.scala:36) at
SpaceSaving$$anonfun$main$1.apply(SpaceSaving.scala:36) at
scala.collection.Iterator$class.foreach(Iterator.scala:727)     at
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)  at
org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765)        at
org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765)        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)   at
org.apache.spark.scheduler.Task.run(Task.scala:56)      at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Project-Fails-to-run-multicore-in-local-mode-tp21034.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to