Use local[*] instead of local to grab all available cores. Using local just grabs one.
Dean On Thursday, January 8, 2015, mixtou <[email protected]> wrote: > I am new to Apache Spark, now i am trying my first project "Space Saving > Counting Algorithm" and while it compiles in single core using > .setMaster("local") it fails when using .setMaster("local[4]") or any > number>1. My Code follows: > ============================================================= import > org.apache.spark.{SparkConf, SparkContext} /** * Created by mixtou on > 30/12/14. */ object SpaceSaving { var frequent_words_counters = > scala.collection.immutable.Map[String, Array[Int]](); var guaranteed_words > = scala.collection.immutable.Map[String, Array[Int]](); val top_k: Int = > 100; var words_no: Int = 0; var tStart: Long = 0; var tStop: Long = 0; var > fi: Double = 0.001; def main (args: Array[String]): Unit = { val sparkConf > = new SparkConf().setAppName("Space Saving Project").setMaster("local"); > val ctx = new SparkContext(sparkConf); // val stopwords = > ctx.textFile("/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/stopwords.txt"); > val lines = > ctx.textFile("/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/README.txt", > 2) .map(line => line.toLowerCase()); val nonEmptyLines = lines.filter(line > => line.nonEmpty); val regex = "[,.:;'\"\\?\\-!\\(\\)\\+\\[\\]\\d+]".r; // > val temp = nonEmptyLines.map(removeStopWords(_, stopwords)); val cleanLines > = nonEmptyLines.map(line => regex.replaceAllIn(line, " ")); val dirtyWords > = cleanLines.flatMap(line => line.split("\\s+")); val words = > dirtyWords.filter(word => word.length > 3); * words.foreach(word => > space_saving_algorithm(word));* *ERROR HERE!*! if > (frequent_words_counters.size > 0) { frequent_words_counters.foreach(line > => println("Top Frequent Word: " + line._1 + " with count: " + line._2(0) + > " end error: " + line._2(1))); } System.out.println("=========== > Throughput:=> "+ 1000*(words_no/(tStop - tStart))+ " words per second. " ); > estimateGuaranteedFrequentWords(); words.collect(); ctx.stop(); } def > space_saving_algorithm(word: String): Unit = { // System.out.println(word); > if (frequent_words_counters.contains(word)) { * val count = > frequent_words_counters.get(word).get(0);* *ERROR HERE* val error = > frequent_words_counters.get(word).get(1); // System.out.println("Before: " > + word + " <=> " + count); frequent_words_counters += word -> > Array[Int](count+1, error); // System.out.println("After: " + word + " <=> > " + counters.get(word).get(0)); } else { if (frequent_words_counters.size < > top_k) { frequent_words_counters += word -> Array[Int](1, 0); // > System.out.println("Adding Word to Counters: "+word); } else { > replaceLeastEntry(word); } } if(words_no > 0 ){ tStop = > java.lang.System.currentTimeMillis(); } else{ tStart = > java.lang.System.currentTimeMillis(); } words_no += 1; } def > replaceLeastEntry(word: String): Unit = { var temp_list = > frequent_words_counters.toList.sortWith( (x,y) => x._2(0) > y._2(0) ); val > word_count = temp_list.last._2(0); // System.out.println("Replacing word: " > + temp_list.last._1 + ", having count and error " + temp_list.last._2(0)+" > , " + temp_list.last._2(1) + " with word: "+word); // > System.out.println(temp_list.length); temp_list = > temp_list.take(temp_list.length - 1); frequent_words_counters = > temp_list.toMap[String, Array[Int]]; frequent_words_counters += word -> > Array[Int](word_count+1, word_count); } def > estimateGuaranteedFrequentWords(): Unit = { > frequent_words_counters.foreach{tuple => if (tuple._2(0) - tuple._2(1) < > words_no*fi) { // counters -= tuple._1; guaranteed_words += tuple; // > System.out.println("NEW SISZEZEEEE: "+counters.size); } else { > System.out.println("Guaranteed Word : "+tuple._1+" with count: > "+tuple._2(0)+" and error: "+tuple._2(1)); } }; } } The Compiler Error is: > ====================================== 15/01/08 16:44:51 ERROR Executor: > Exception in task 1.0 in stage 0.0 (TID 1) > java.util.NoSuchElementException: None.get at > scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at > SpaceSaving$.space_saving_algorithm(SpaceSaving.scala:52) at > SpaceSaving$$anonfun$main$1.apply(SpaceSaving.scala:36) at > SpaceSaving$$anonfun$main$1.apply(SpaceSaving.scala:36) at > scala.collection.Iterator$class.foreach(Iterator.scala:727) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at > org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765) at > org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765) at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at > org.apache.spark.scheduler.Task.run(Task.scala:56) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > ------------------------------ > View this message in context: Spark Project Fails to run multicore in > local mode. > <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Project-Fails-to-run-multicore-in-local-mode-tp21034.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. > -- Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com
