Use local[*] instead of local to grab all available cores. Using local just
grabs one.

Dean

On Thursday, January 8, 2015, mixtou <[email protected]> wrote:

> I am new to Apache Spark, now i am trying my first project "Space Saving
> Counting Algorithm" and while it compiles in single core using
> .setMaster("local") it fails when using .setMaster("local[4]") or any
> number>1. My Code follows:
> ============================================================= import
> org.apache.spark.{SparkConf, SparkContext} /** * Created by mixtou on
> 30/12/14. */ object SpaceSaving { var frequent_words_counters =
> scala.collection.immutable.Map[String, Array[Int]](); var guaranteed_words
> = scala.collection.immutable.Map[String, Array[Int]](); val top_k: Int =
> 100; var words_no: Int = 0; var tStart: Long = 0; var tStop: Long = 0; var
> fi: Double = 0.001; def main (args: Array[String]): Unit = { val sparkConf
> = new SparkConf().setAppName("Space Saving Project").setMaster("local");
> val ctx = new SparkContext(sparkConf); // val stopwords =
> ctx.textFile("/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/stopwords.txt");
> val lines =
> ctx.textFile("/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/README.txt",
> 2) .map(line => line.toLowerCase()); val nonEmptyLines = lines.filter(line
> => line.nonEmpty); val regex = "[,.:;'\"\\?\\-!\\(\\)\\+\\[\\]\\d+]".r; //
> val temp = nonEmptyLines.map(removeStopWords(_, stopwords)); val cleanLines
> = nonEmptyLines.map(line => regex.replaceAllIn(line, " ")); val dirtyWords
> = cleanLines.flatMap(line => line.split("\\s+")); val words =
> dirtyWords.filter(word => word.length > 3); * words.foreach(word =>
> space_saving_algorithm(word));* *ERROR HERE!*! if
> (frequent_words_counters.size > 0) { frequent_words_counters.foreach(line
> => println("Top Frequent Word: " + line._1 + " with count: " + line._2(0) +
> " end error: " + line._2(1))); } System.out.println("===========
> Throughput:=> "+ 1000*(words_no/(tStop - tStart))+ " words per second. " );
> estimateGuaranteedFrequentWords(); words.collect(); ctx.stop(); } def
> space_saving_algorithm(word: String): Unit = { // System.out.println(word);
> if (frequent_words_counters.contains(word)) { * val count =
> frequent_words_counters.get(word).get(0);* *ERROR HERE* val error =
> frequent_words_counters.get(word).get(1); // System.out.println("Before: "
> + word + " <=> " + count); frequent_words_counters += word ->
> Array[Int](count+1, error); // System.out.println("After: " + word + " <=>
> " + counters.get(word).get(0)); } else { if (frequent_words_counters.size <
> top_k) { frequent_words_counters += word -> Array[Int](1, 0); //
> System.out.println("Adding Word to Counters: "+word); } else {
> replaceLeastEntry(word); } } if(words_no > 0 ){ tStop =
> java.lang.System.currentTimeMillis(); } else{ tStart =
> java.lang.System.currentTimeMillis(); } words_no += 1; } def
> replaceLeastEntry(word: String): Unit = { var temp_list =
> frequent_words_counters.toList.sortWith( (x,y) => x._2(0) > y._2(0) ); val
> word_count = temp_list.last._2(0); // System.out.println("Replacing word: "
> + temp_list.last._1 + ", having count and error " + temp_list.last._2(0)+"
> , " + temp_list.last._2(1) + " with word: "+word); //
> System.out.println(temp_list.length); temp_list =
> temp_list.take(temp_list.length - 1); frequent_words_counters =
> temp_list.toMap[String, Array[Int]]; frequent_words_counters += word ->
> Array[Int](word_count+1, word_count); } def
> estimateGuaranteedFrequentWords(): Unit = {
> frequent_words_counters.foreach{tuple => if (tuple._2(0) - tuple._2(1) <
> words_no*fi) { // counters -= tuple._1; guaranteed_words += tuple; //
> System.out.println("NEW SISZEZEEEE: "+counters.size); } else {
> System.out.println("Guaranteed Word : "+tuple._1+" with count:
> "+tuple._2(0)+" and error: "+tuple._2(1)); } }; } } The Compiler Error is:
> ====================================== 15/01/08 16:44:51 ERROR Executor:
> Exception in task 1.0 in stage 0.0 (TID 1)
> java.util.NoSuchElementException: None.get at
> scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at
> SpaceSaving$.space_saving_algorithm(SpaceSaving.scala:52) at
> SpaceSaving$$anonfun$main$1.apply(SpaceSaving.scala:36) at
> SpaceSaving$$anonfun$main$1.apply(SpaceSaving.scala:36) at
> scala.collection.Iterator$class.foreach(Iterator.scala:727) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765) at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765) at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at
> org.apache.spark.scheduler.Task.run(Task.scala:56) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> ------------------------------
> View this message in context: Spark Project Fails to run multicore in
> local mode.
> <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Project-Fails-to-run-multicore-in-local-mode-tp21034.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


-- 
Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

Reply via email to