I am new to Apache Spark, now i am trying my first project "Space Saving Counting Algorithm" and while it compiles in single core using .setMaster("local") it fails when using .setMaster("local[4]") or any number>1.My Code follows:=============================================================import org.apache.spark.{SparkConf, SparkContext}/** * Created by mixtou on 30/12/14. */object SpaceSaving { var frequent_words_counters = scala.collection.immutable.Map[String, Array[Int]](); var guaranteed_words = scala.collection.immutable.Map[String, Array[Int]](); val top_k: Int = 100; var words_no: Int = 0; var tStart: Long = 0; var tStop: Long = 0; var fi: Double = 0.001; def main (args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("Space Saving Project").setMaster("local"); val ctx = new SparkContext(sparkConf); // val stopwords = ctx.textFile("/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/stopwords.txt"); val lines = ctx.textFile("/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/README.txt", 2) .map(line => line.toLowerCase()); val nonEmptyLines = lines.filter(line => line.nonEmpty); val regex = "[,.:;'\"\\?\\-!\\(\\)\\+\\[\\]\\d+]".r; // val temp = nonEmptyLines.map(removeStopWords(_, stopwords)); val cleanLines = nonEmptyLines.map(line => regex.replaceAllIn(line, " ")); val dirtyWords = cleanLines.flatMap(line => line.split("\\s+")); val words = dirtyWords.filter(word => word.length > 3); / words.foreach(word => space_saving_algorithm(word));/ *ERROR HERE!*! if (frequent_words_counters.size > 0) { frequent_words_counters.foreach(line => println("Top Frequent Word: " + line._1 + " with count: " + line._2(0) + " end error: " + line._2(1))); } System.out.println("=========== Throughput:=> "+ 1000*(words_no/(tStop - tStart))+ " words per second. " ); estimateGuaranteedFrequentWords(); words.collect(); ctx.stop(); } def space_saving_algorithm(word: String): Unit = {// System.out.println(word); if (frequent_words_counters.contains(word)) { / val count = frequent_words_counters.get(word).get(0);/ *ERROR HERE* val error = frequent_words_counters.get(word).get(1);// System.out.println("Before: " + word + " <=> " + count); frequent_words_counters += word -> Array[Int](count+1, error);// System.out.println("After: " + word + " <=> " + counters.get(word).get(0)); } else { if (frequent_words_counters.size < top_k) { frequent_words_counters += word -> Array[Int](1, 0);// System.out.println("Adding Word to Counters: "+word); } else { replaceLeastEntry(word); } } if(words_no > 0 ){ tStop = java.lang.System.currentTimeMillis(); } else{ tStart = java.lang.System.currentTimeMillis(); } words_no += 1; } def replaceLeastEntry(word: String): Unit = { var temp_list = frequent_words_counters.toList.sortWith( (x,y) => x._2(0) > y._2(0) ); val word_count = temp_list.last._2(0);// System.out.println("Replacing word: " + temp_list.last._1 + ", having count and error " + temp_list.last._2(0)+" , " + temp_list.last._2(1) + " with word: "+word);// System.out.println(temp_list.length); temp_list = temp_list.take(temp_list.length - 1); frequent_words_counters = temp_list.toMap[String, Array[Int]]; frequent_words_counters += word -> Array[Int](word_count+1, word_count); } def estimateGuaranteedFrequentWords(): Unit = { frequent_words_counters.foreach{tuple => if (tuple._2(0) - tuple._2(1) < words_no*fi) {// counters -= tuple._1; guaranteed_words += tuple;// System.out.println("NEW SISZEZEEEE: "+counters.size); } else { System.out.println("Guaranteed Word : "+tuple._1+" with count: "+tuple._2(0)+" and error: "+tuple._2(1)); } }; }}The Compiler Error is:======================================15/01/08 16:44:51 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at SpaceSaving$.space_saving_algorithm(SpaceSaving.scala:52) at SpaceSaving$$anonfun$main$1.apply(SpaceSaving.scala:36) at SpaceSaving$$anonfun$main$1.apply(SpaceSaving.scala:36) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
-- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Project-Fails-to-run-multicore-in-local-mode-tp21034.html Sent from the Apache Spark User List mailing list archive at Nabble.com.