Spark 0.9.1 - saveAsTextFile() exception: _temporary doesn't exist!
Hi All, After a few simple transformations I am trying to save to a local file system. The code works in local mode but not on a standalone cluster. The directory *1.txt/_temporary* does exist after the exception. I would appreciate any suggestions. *scala d3.sample(false,0.01,1).map( pair = pair._2 ).saveAsTextFile(1.txt)* 14/06/09 22:06:40 ERROR TaskSetManager: Task 0.0:0 failed 4 times; aborting job *org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times (most recent failure: Exception failure: java.io.IOException: The temporary job-output directory file:/data/spark-0.9.1-bin-hadoop1/1.txt/_temporary doesn't exist!)* at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thank you, Oleg
Re: Using Java functions in Spark
Increasing number of partitions on data file solved the problem. On 6 June 2014 18:46, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Additional observation - the map and mapValues are pipelined and executed - as expected - in pairs. This means that there is a simple sequence of steps - first read from Cassandra and then processing for each value of K. This is the exact behaviour of a normal Java loop with these two steps inside. I understand that this eliminates batch loading first and pile up of massive text arrays. Also the keys are relatively evenly distributed across Executors. The question is - why is this still so slow? I would appreciate any suggestions on where to focus my search. Thank you, Oleg On 6 June 2014 16:24, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Hi All, I am passing Java static methods into RDD transformations map and mapValues. The first map is from a simple string K into a (K,V) where V is a Java ArrayList of large text strings, 50K each, read from Cassandra. MapValues does processing of these text blocks into very small ArrayLists. The code runs quite slow compared to running it in parallel on the same servers from plain Java. I gave the same heap to Executors and Java. Does java run slower under Spark or do I suffer from excess heap pressure or am I missing something? Thank you for any insight, Oleg -- Kind regards, Oleg -- Kind regards, Oleg
Re: Setting executor memory when using spark-shell
Thank you, Andrew! On 5 June 2014 23:14, Andrew Ash and...@andrewash.com wrote: Oh my apologies that was for 1.0 For Spark 0.9 I did it like this: MASTER=spark://mymaster:7077 SPARK_MEM=8g ./bin/spark-shell -c $CORES_ACROSS_CLUSTER The downside of this though is that SPARK_MEM also sets the driver's JVM to be 8g, rather than just the executors. I think this is the reason for why SPARK_MEM was deprecated. See https://github.com/apache/spark/pull/99 On Thu, Jun 5, 2014 at 2:37 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Thank you, Andrew, I am using Spark 0.9.1 and tried your approach like this: bin/spark-shell --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR I get bad option: '--driver-java-options' There must be something different in my setup. Any ideas? Thank you again, Oleg On 5 June 2014 22:28, Andrew Ash and...@andrewash.com wrote: Hi Oleg, I set the size of my executors on a standalone cluster when using the shell like this: ./bin/spark-shell --master $MASTER --total-executor-cores $CORES_ACROSS_CLUSTER --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR It doesn't seem particularly clean, but it works. Andrew On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Hi All, Please help me set Executor JVM memory size. I am using Spark shell and it appears that the executors are started with a predefined JVM heap of 512m as soon as Spark shell starts. How can I change this setting? I tried setting SPARK_EXECUTOR_MEMORY before launching Spark shell: export SPARK_EXECUTOR_MEMORY=1g I also tried several other approaches: 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker 2) passing it as -m argument and running bin/start-slave.sh 1 -m 1g on the worker Thank you, Oleg -- Kind regards, Oleg -- Kind regards, Oleg
Re: Setting executor memory when using spark-shell
Thank you, Hassan! On 6 June 2014 03:23, hassan hellfire...@gmail.com wrote: just use -Dspark.executor.memory= -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg
Using Java functions in Spark
Hi All, I am passing Java static methods into RDD transformations map and mapValues. The first map is from a simple string K into a (K,V) where V is a Java ArrayList of large text strings, 50K each, read from Cassandra. MapValues does processing of these text blocks into very small ArrayLists. The code runs quite slow compared to running it in parallel on the same servers from plain Java. I gave the same heap to Executors and Java. Does java run slower under Spark or do I suffer from excess heap pressure or am I missing something? Thank you for any insight, Oleg
Re: Setting executor memory when using spark-shell
Thank you, Patrick I am planning to switch to 1.0 now. By the way of feedback - I used Andrew's suggestion and found that it does exactly that - sets Executor JVM heap - and nothing else. Workers have already been started and when the shell starts, it is now able to control Executor JVM heap. Thank you again, Oleg On 6 June 2014 18:05, Patrick Wendell pwend...@gmail.com wrote: In 1.0+ you can just pass the --executor-memory flag to ./bin/spark-shell. On Fri, Jun 6, 2014 at 12:32 AM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Thank you, Hassan! On 6 June 2014 03:23, hassan hellfire...@gmail.com wrote: just use -Dspark.executor.memory= -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg -- Kind regards, Oleg
Re: Using Java functions in Spark
Additional observation - the map and mapValues are pipelined and executed - as expected - in pairs. This means that there is a simple sequence of steps - first read from Cassandra and then processing for each value of K. This is the exact behaviour of a normal Java loop with these two steps inside. I understand that this eliminates batch loading first and pile up of massive text arrays. Also the keys are relatively evenly distributed across Executors. The question is - why is this still so slow? I would appreciate any suggestions on where to focus my search. Thank you, Oleg On 6 June 2014 16:24, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Hi All, I am passing Java static methods into RDD transformations map and mapValues. The first map is from a simple string K into a (K,V) where V is a Java ArrayList of large text strings, 50K each, read from Cassandra. MapValues does processing of these text blocks into very small ArrayLists. The code runs quite slow compared to running it in parallel on the same servers from plain Java. I gave the same heap to Executors and Java. Does java run slower under Spark or do I suffer from excess heap pressure or am I missing something? Thank you for any insight, Oleg -- Kind regards, Oleg
Setting executor memory when using spark-shell
Hi All, Please help me set Executor JVM memory size. I am using Spark shell and it appears that the executors are started with a predefined JVM heap of 512m as soon as Spark shell starts. How can I change this setting? I tried setting SPARK_EXECUTOR_MEMORY before launching Spark shell: export SPARK_EXECUTOR_MEMORY=1g I also tried several other approaches: 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker 2) passing it as -m argument and running bin/start-slave.sh 1 -m 1g on the worker Thank you, Oleg
Re: Setting executor memory when using spark-shell
Thank you, Andrew, I am using Spark 0.9.1 and tried your approach like this: bin/spark-shell --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR I get bad option: '--driver-java-options' There must be something different in my setup. Any ideas? Thank you again, Oleg On 5 June 2014 22:28, Andrew Ash and...@andrewash.com wrote: Hi Oleg, I set the size of my executors on a standalone cluster when using the shell like this: ./bin/spark-shell --master $MASTER --total-executor-cores $CORES_ACROSS_CLUSTER --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR It doesn't seem particularly clean, but it works. Andrew On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Hi All, Please help me set Executor JVM memory size. I am using Spark shell and it appears that the executors are started with a predefined JVM heap of 512m as soon as Spark shell starts. How can I change this setting? I tried setting SPARK_EXECUTOR_MEMORY before launching Spark shell: export SPARK_EXECUTOR_MEMORY=1g I also tried several other approaches: 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker 2) passing it as -m argument and running bin/start-slave.sh 1 -m 1g on the worker Thank you, Oleg -- Kind regards, Oleg
Re: RDD with a Map
Just a thought... Are you trying to use use the RDD as a Map? On 3 June 2014 23:14, Doris Xin doris.s@gmail.com wrote: Hey Amit, You might want to check out PairRDDFunctions http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions. For your use case in particular, you can load the file as a RDD[(String, String)] and then use the groupByKey() function in PairRDDFunctions to get an RDD[(String, Iterable[String])]. Doris On Tue, Jun 3, 2014 at 2:56 PM, Amit Kumar kumarami...@gmail.com wrote: Hi Folks, I am new to spark -and this is probably a basic question. I have a file on the hdfs 1, one 1, uno 2, two 2, dos I want to create a multi Map RDD RDD[Map[String,List[String]]] {1-[one,uno], 2-[two,dos]} First I read the file val identityData:RDD[String] = sc.textFile($path_to_the_file, 2).cache() val identityDataList:RDD[List[String]]= identityData.map{ line = val splits= line.split(,) splits.toList } Then I group them by the first element val grouped:RDD[(String,Iterable[List[String]])]= songArtistDataList.groupBy{ element ={ element(0) } } Then I do the equivalent of mapValues of scala collections to get rid of the first element val groupedWithValues:RDD[(String,List[String])] = grouped.flatMap[(String,List[String])]{ case (key,list)={ List((key,list.map{element = { element(1) }}.toList)) } } for this to actually materialize I do collect val groupedAndCollected=groupedWithValues.collect() I get an Array[String,List[String]]. I am trying to figure out if there is a way for me to get Map[String,List[String]] (a multimap), or to create an RDD[Map[String,List[String]] ] I am sure there is something simpler, I would appreciate advice. Many thanks, Amit -- Kind regards, Oleg
Re: Can this be done in map-reduce technique (in parallel)
It is possible if you use a cartesian product to produce all possible pairs for each IP address and 2 stages of map-reduce: - first by pairs of points to find the total of each pair and - second by IP address to find the pair for each IP address with the maximum count. Oleg On 4 June 2014 11:49, lmk lakshmi.muralikrish...@gmail.com wrote: Hi, I am a new spark user. Pls let me know how to handle the following scenario: I have a data set with the following fields: 1. DeviceId 2. latitude 3. longitude 4. ip address 5. Datetime 6. Mobile application name With the above data, I would like to perform the following steps: 1. Collect all lat and lon for each ipaddress (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) 2. For each IP, 1.Find the distance between each lat and lon coordinate pair and all the other pairs under the same IP 2.Select those coordinates whose distances fall under a specific threshold (say 100m) 3.Find the coordinate pair with the maximum occurrences In this case, how can I iterate and compare each coordinate pair with all the other pairs? Can this be done in a distributed manner, as this data set is going to have a few million records? Can we do this in map/reduce commands? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg
Reconnect to an application/RDD
HI All, Is it possible to run a standalone app that would compute and persist/cache an RDD and then run other standalone apps that would gain access to that RDD? -- Thank you, Oleg
Re: sc.textFileGroupByPath(*/*.txt)
Anwar, Will try this as it might do exactly what I need. I will follow your pattern but use sc.textFile() for each file. I am now thinking that I could start with an RDD of file paths and map it into (path, content) pairs, provided I could read a file on the server. Thank you, Oleg On 1 June 2014 18:41, Anwar Rizal anriza...@gmail.com wrote: I presume that you need to have access to the path of each file you are reading. I don't know whether there is a good way to do that for HDFS, I need to read the files myself, something like: def openWithPath(inputPath: String, sc:SparkContext) = { val fs= (new Path(inputPath)).getFileSystem(sc.hadoopConfiguration) val filesIt = fs.listFiles(path, false) val paths = new ListBuffer[URI] while (filesIt.hasNext) { paths += filesIt.next.getPath.toUri } val withPaths = paths.toList.map{ p = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](p.toString).map{ case (_,s) = (p, s.toString) } } withPaths.reduce{ _ ++ _ } } ... I would be interested if there is a better way to do the same thing ... Cheers, a: On Sun, Jun 1, 2014 at 6:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Could you provide an example of what you mean? I know it's possible to create an RDD from a path with wildcards, like in the subject. For example, sc.textFile('s3n://bucket/2014-??-??/*.gz'). You can also provide a comma delimited list of paths. Nick 2014년 6월 1일 일요일, Oleg Proudnikovoleg.proudni...@gmail.com님이 작성한 메시지: Hi All, Is it possible to create an RDD from a directory tree of the following form? RDD[(PATH, Seq[TEXT])] Thank you, Oleg -- Kind regards, Oleg