Re: PySpark script works itself, but fails when called from other script
I've tried adding task.py to pyFiles during SparkContext creation and it worked perfectly. Thanks for your help! If you need some more information for further investigation, here's what I've noticed. Without explicitly adding file to SparkContext, only functions that are defined in main module run by PySpark can be passed to distributed jobs. E.g. if I define myfunc() in runner.py (and run runner.py), it works pretty well. But if I define myfunc() in task.py (and still run runner.py), it fails as I've described above. I've posted stderr from failed executor here http://pastebin.com/NHNW3sTY, but essentially it just says that Python worker crashed without any reference to the cause. For the sake of completeness, here's also console outputhttp://pastebin.com/Lkvdfhhz . To make it clear: all these errors occur only in my initial setup, adding task.py to SparkContext fixes it anyway. Hope this helps. Thanks, Andrei On Sat, Nov 16, 2013 at 2:12 PM, Andrei faithlessfri...@gmail.com wrote: Hi, thanks for your replies. I'm out of office now, so I will check it out on Monday morning, but guess about serialization/deserialization looks plausible. Thanks, Andrei On Sat, Nov 16, 2013 at 11:11 AM, Jey Kottalam j...@cs.berkeley.eduwrote: Hi Andrei, Could you please post the stderr logfile from the failed executor? You can find this in the work subdirectory of the worker that had the failed task. You'll need the executor id to find the corresonding stderr file. Thanks, -Jey On Friday, November 15, 2013, Andrei wrote: I have 2 Python modules/scripts - task.py and runner.py. First one (task.py) is a little Spark job and works perfectly well by itself. However, when called from runner.py with exactly the same arguments, it fails with only useless message (both - in terminal and worker logs). org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) Below there's code for both - task.py and runner.py: task.py --- #!/usr/bin/env pyspark from __future__ import print_function from pyspark import SparkContext def process(line): return line.strip() def main(spark_master, path): sc = SparkContext(spark_master, 'My Job') rdd = sc.textFile(path) rdd = rdd.map(process) # this line causes troubles when called from runner.py count = rdd.count() print(count) if __name__ == '__main__': main('spark://spark-master-host:7077', 'hdfs://hdfs-namenode-host:8020/path/to/file.log') runner.py - #!/usr/bin/env pyspark import task if __name__ == '__main__': task.main('spark://spark-master-host:7077', 'hdfs://hdfs-namenode-host:8020/path/to/file.log') --- So, what's the difference between calling PySpark-enabled script directly and as Python module? What are good rules for writing multi-module Python programs with Spark? Thanks, Andrei
How to efficiently manage resources across a cluster and avoid GC overhead exceeded errors?
Hi, I have a cluster of 20 servers, each having 24 cores and 30GB of RAM allocated to Spark. Spark runs in a STANDALONE mode. I am trying to load some 200+GB files and cache the rows using .cache(). What I would like to do is the following: (ATM from the scala console) -Evenly load the files across the 20 servers (preferably using all 20*24 cores for the load) -Verify that data are loaded as NODE_LOCAL Looking into the :4040 console, I see in some runs a lot of NODE_LOCAL but in others a lot of ANY. Is there a way to identify what is that TID doing in ANY If I allocate less than ~double the memory I need, I get an OutOfMemory error. If I use the textFile (int) parameter, * i.e. sc.textFile(hdfs://...,20) Then the error goes away. On the other hand, if I allocate enough memory, I can see from the admin console that some of my workers have too much load and some other less than half. I understand that I could use a partitioner to balance my data but I wouldn't expect an OOME if nodes are significantly under-used. Am I missing something? Thanks, Ioannis Deligiannis ___ This message is for information purposes only, it is not a recommendation, advice, offer or solicitation to buy or sell a product or service nor an official confirmation of any transaction. It is directed at persons who are professionals and is not intended for retail customer use. Intended for recipient only. This message is subject to the terms at: www.barclays.com/emaildisclaimer. For important disclosures, please see: www.barclays.com/salesandtradingdisclaimer regarding market commentary from Barclays Sales and/or Trading, who are active market participants; and in respect of Barclays Research, including disclosures relating to specific issuers, please see http://publicresearch.barclays.com. ___
Re: App master failed to find application jar in the master branch on YARN
Hey Jiacheng Guo, do you have SPARK_EXAMPLES_JAR env variable set? If you do, you have to add the --addJars parameter to the yarn client and point to the spark examples jar. Or just unset SPARK_EXAMPLES_JAR env variable. You should only have to set SPARK_JAR env variable. If that isn't the issue let me know the build command you used and hadoop version, and your defaultFs or hadoop. Tom On Saturday, November 16, 2013 2:32 AM, guojc guoj...@gmail.com wrote: hi, After reading about the exiting progress in consolidating shuffle, I'm eager to trying out the last master branch. However up to launch the example application, the job failed with prompt the app master failed to find the target jar. appDiagnostics: Application application_1384588058297_0017 failed 1 times due to AM Container for appattempt_1384588058297_0017_01 exited with exitCode: -1000 due to: java.io.FileNotFoundException: File file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar does not exist. Is there any change on how to launch a yarn job now? Best Regards, Jiacheng Guo
TeraSort on Spark
Hello spark community. I wanted to ask if any work has been done on porting TeraSort (Tera Gen/Sort/Validate) from Hadoop to Spark on EC2/EMR I am looking for some guidance on lessons learned from this or similar efforts as we are trying to do some benchmarking on some of the newer EC2 instances to determine how to optimize in-memory processing of these instances with Spark for some of AWS' customers looking to move to Spark for their data processing workloads. Any guidance the community can provide on this effort is greatly appreciated! Thanks, Dario Rivera Solutions Architect Cell: 571-205-2731 Email: dar...@amazon.commailto:dar...@amazon.com [AWS Graphic] inline: image003.jpg
Re: foreachPartition in Java
Great, I will use mapPartitions instead. Thanks for the advice, Yadid On 11/17/13 8:13 PM, Aaron Davidson wrote: Also, in general, you can workaround shortcomings in the Java API by converting to a Scala RDD (using JavaRDD's rdd() method). The API tends to be much clunkier since you have to jump through some hoops to talk to a Scala API in Java, though. In this case, JavaRDD's mapPartition() method will likely be the cleanest solution as Patrick said. On Sun, Nov 17, 2013 at 5:03 PM, Patrick Wendell pwend...@gmail.com mailto:pwend...@gmail.com wrote: Can you just call mapPartitions and ignore the result? - Patrick On Sun, Nov 17, 2013 at 4:45 PM, Yadid Ayzenberg ya...@media.mit.edu mailto:ya...@media.mit.edu wrote: Hi, According to the API, foreachPartition() is not yet implemented in Java. Are there any workarounds to get the same functionality ? I have a non serializable DB connection and instantiating it is pretty expensive, so I prefer to do it on a per partition basis. thanks, Yadid
Fast Data Processing with Spark
Hello Does any one read the Fast Data Processing with Spark book ( http://www.amazon.com/Fast-Processing-Spark-Holden-Karau/dp/1782167064/ref=sr_1_1?ie=UTF8qid=1384791032sr=8-1keywords=fast+spark+data+processing ) any review or opinions about the material? because im thinking to buy the book so i can learn more about spark but i didnt see any comment's Thanks *#__* *Atte.* *Rafael R.*
Re: App master failed to find application jar in the master branch on YARN
Hi Tom, I'm on Hadoop 2.05. I can launch application spark 0.8 release normally. However I switch to git master branch version with application built with it, I got the jar not found exception and same happens to the example application. I have tried both file:// protocol and hdfs:// protocol with jar in local file system and hdfs respectively, and even tried jar list parameter when new spark context. The exception is slightly different for hdfs protocol and local file path. My application launch command is SPARK_JAR=/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar /home/work/guojiacheng/spark/spark-class org.apache.spark.deploy.yarn.Client --jar /home/work/guojiacheng/spark-auc/target/scala-2.9.3/SparkAUC-assembly-0.1.jar --class myClass.SparkAUC --args -c --args yarn-standalone --args -i --args hdfs://{hdfs_host}:9000/user/work/guojiacheng/data --args -m --args hdfs://{hdfs_host}:9000/user/work/guojiacheng/model_large --args -o --args hdfs://{hdfs_host}:9000/user/work/guojiacheng/score --num-workers 60 --master-memory 6g --worker-memory 7g --worker-cores 1 And my build command is SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly Only thing I can think of might be related is on each cluster node, it has a env SPARK_HOME point to a copy of 0.8 version's position, and its bin fold is in Path environment variable. And 0.9 version is not there. It was something left over, when cluster was setup. But I don't know whether it is related, as my understand is the yarn version try to distribute spark through yarn. hdfs version error message: appDiagnostics: Application application_1384588058297_0056 failed 1 times due to AM Container for appattempt_1384588058297_0056_01 exited with exitCode: -1000 due to: RemoteTrace: java.io.FileNotFoundException: File file:/home/work/.sparkStaging/application_1384588058297_0056/SparkAUC-assembly-0.1.jar does not exist local version error message. appDiagnostics: Application application_1384588058297_0066 failed 1 times due to AM Container for appattempt_1384588058297_0066_01 exited with exitCode: -1000 due to: java.io.FileNotFoundException: File file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar does not exist Best Regards, Jiacheng GUo On Mon, Nov 18, 2013 at 10:34 PM, Tom Graves tgraves...@yahoo.com wrote: Hey Jiacheng Guo, do you have SPARK_EXAMPLES_JAR env variable set? If you do, you have to add the --addJars parameter to the yarn client and point to the spark examples jar. Or just unset SPARK_EXAMPLES_JAR env variable. You should only have to set SPARK_JAR env variable. If that isn't the issue let me know the build command you used and hadoop version, and your defaultFs or hadoop. Tom On Saturday, November 16, 2013 2:32 AM, guojc guoj...@gmail.com wrote: hi, After reading about the exiting progress in consolidating shuffle, I'm eager to trying out the last master branch. However up to launch the example application, the job failed with prompt the app master failed to find the target jar. appDiagnostics: Application application_1384588058297_0017 failed 1 times due to AM Container for appattempt_1384588058297_0017_01 exited with exitCode: -1000 due to: java.io.FileNotFoundException: File file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar does not exist. Is there any change on how to launch a yarn job now? Best Regards, Jiacheng Guo
Re: code review - splitting columns
This is in response to your question about something in the API that already does this. You might want to keep your eye on MLI ( http://www.mlbase.org), which is columnar table written for machine learning but applicable to a lot of problems. It's not perfect right now. On Fri, Nov 15, 2013 at 7:56 PM, Aaron Davidson ilike...@gmail.com wrote: Regarding only your last point, you could always split backwards to avoid having to worry about updated indices (i.e., split the highest index column first). But if you're additionally worried about efficiency, a combined approach could make more sense to avoid making two full passes on the data. Otherwise, I don't see anything particularly amiss here, but I'm no expert. On Wed, Nov 13, 2013 at 3:00 PM, Philip Ogren philip.og...@oracle.comwrote: Hi Spark community, I learned a lot the last time I posted some elementary Spark code here. So, I thought I would do it again. Someone politely tell me offline if this is noise or unfair use of the list! I acknowledge that this borders on asking Scala 101 questions I have an RDD[List[String]] corresponding to columns of data and I want to split one of the columns using some arbitrary function and return an RDD updated with the new columns. Here is the code I came up with. def splitColumn(columnsRDD: RDD[List[String]], columnIndex: Int, numSplits: Int, splitFx: String = List[String]): RDD[List[String]] = { def insertColumns(columns: List[String]) : List[String] = { val split = columns.splitAt(columnIndex) val left = split._1 val splitColumn = split._2.head val splitColumns = splitFx(splitColumn).padTo(numSplits, ).take(numSplits) val right = split._2.tail left ++ splitColumns ++ right } columnsRDD.map(columns = insertColumns(columns)) } Here is a simple test that demonstrates the behavior: val spark = new SparkContext(local, test spark) val testStrings = List(List(1.2, a b), List(3.4, c d e), List(5.6, f)) var testRDD: RDD[List[String]] = spark.parallelize(testStrings) testRDD = splitColumn(testRDD, 0, 2, _.split(\\.).toList) testRDD = splitColumn(testRDD, 2, 2, _.split( ).toList) //Line 5 val actualStrings = testRDD.collect.toList assertEquals(4, actualStrings(0).length) assertEquals(1, 2, a, b, actualStrings(0).mkString(, )) assertEquals(4, actualStrings(1).length) assertEquals(3, 4, c, d, actualStrings(1).mkString(, )) assertEquals(4, actualStrings(2).length) assertEquals(5, 6, f, , actualStrings(2).mkString(, )) My first concern about this code is that I'm missing out on something that does exactly this in the API. This seems like such a common use case that I would not be surprised if there's a readily available way to do this. I'm a little uncertain about the typing of splitColumn - i.e. the first parameter and the return value. It seems like a general solution wouldn't require every column to be a String value. I'm also annoyed that line 5 in the test code requires that I use an updated index to split what was originally the second column. This suggests that perhaps I should split all the columns that need splitting in one function call - but it seems like doing that would require an unwieldy function signature. Any advice or insight is appreciated! Thanks, Philip
Re: Spark Avro in Scala
Agree with Eugen that you should used Kryo. But even better is to embed your Avro objects inside of Kryo. This allows you to have the benefits of both Avro and Kryo. Here's example code for using Avro with Kryo. https://github.com/massie/adam/blob/master/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/AdamKryoRegistrator.scala You need to register all of you Avro SpecificClasses with Kryo and have it use the AvroSerializer class to encode/decode them. e.g. class AdamKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[MySpecificAvroClass], new AvroSerializer[MySpecificAvroClass]() } } -- Matt Massie UC, Berkeley AMPLab Twitter: @matt_massie https://twitter.com/matt_massie, @amplabhttps://twitter.com/amplab https://amplab.cs.berkeley.edu/ On Mon, Nov 18, 2013 at 10:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi Robert, The problem is that spark uses java serialization requiring serialized objects to implement Serializable, AvroKey doesn't. As a workaround you can try using kryohttp://spark.incubator.apache.org/docs/latest/tuning.html#data-serializationfor the serialization. Eugen 2013/11/11 Robert Fink ursula2...@gmail.com Hi, I am trying to get the following minimal Scala example work: Using Spark to process Avro records. Here's my dummy Avro definition: { namespace: com.avrotest, type: record, name: AvroTest, fields: [ {name: field1, type: [string, null]} ] } I experiment with a simple job that creates three AvroTest objects, writes them out to a file through a SparkContext, and then reads in the thus generated Avro file and performs a simple grouping operation: // - import org.apache.spark.SparkContext._ import org.apache.avro.specific.SpecificDatumWriter import org.apache.avro.file.DataFileWriter import org.apache.avro._ import org.apache.avro.generic._ import org.apache.hadoop.mapreduce.Job import com.avrotest.AvroTest import java.io.File object SparkTest{ def main(args: Array[String]) { def avrofile = output.avro def sc = new SparkContext(local, Simple App) val job = new Job() val record1 = new AvroTest() record1.setField1(value1) val record2 = new AvroTest() record2.setField1(value1) val record3 = new AvroTest() record3.setField1(value2) def userDatumWriter = new SpecificDatumWriter[AvroTest]() val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter) def file = new File(avrofile) dataFileWriter.create(record1.getSchema(), file) dataFileWriter.append(record1) dataFileWriter.append(record2) dataFileWriter.append(record3) dataFileWriter.close() def rdd = sc.newAPIHadoopFile( avrofile, classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]], classOf[org.apache.avro.mapred.AvroKey[AvroTest]], classOf[org.apache.hadoop.io.NullWritable], job.getConfiguration) // rdd.foreach( x = println(x._1.datum.getField1) ) // Prints value1, value1, value2 val numGroups= rdd.groupBy(x = x._1.datum.getField1).count() } } // - I would expect numGroups==2 in the last step, because record1 and record2 share the getField1()==value1, and record3 has getField1() == value2. However, the script fails to execute with the following error (see below). Can anyone give me a hint what could be wrong in the above code, or post an example of reading from an Avro file and performing some simple computations on the retrieved objects? Thank you so much! Robert. 11650 [pool-109-thread-1] WARN org.apache.avro.mapreduce.AvroKeyInputFormat - Reader schema was not set. Use AvroJob.setInputKeySchema() if desired. 11661 [pool-109-thread-1] INFO org.apache.avro.mapreduce.AvroKeyInputFormat - Using a reader schema equal to the writer schema. 12293 [spark-akka.actor.default-dispatcher-5] INFO org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to java.io.NotSerializableException java.io.NotSerializableException: org.apache.avro.mapred.AvroKey at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at
debugging a Spark error
Hi, I'm trying to figure out what the problem is with a job that we are running on Spark 0.7.3. When we write out via saveAsTextFile we get an exception that doesn't reveal much: 13/11/18 15:06:19 INFO cluster.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:849) at spark.storage.DiskStore.getBytes(DiskStore.scala:86) at spark.storage.DiskStore.getValues(DiskStore.scala:92) at spark.storage.BlockManager.getLocal(BlockManager.scala:284) at spark.storage.BlockFetcherIterator$$anonfun$ 13.apply(BlockManager.scala:1027) at spark.storage.BlockFetcherIterator$$anonfun$ 13.apply(BlockManager.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach( ArrayBuffer.scala:47) at spark.storage.BlockFetcherIterator.init( BlockManager.scala:1026) at spark.storage.BlockManager.getMultiple(BlockManager.scala:478) at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher. scala:51) at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher. scala:10) at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply( CoGroupedRDD.scala:127) at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply( CoGroupedRDD.scala:115) at scala.collection.IndexedSeqOptimized$class. foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38) at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:704) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:714) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.MappedRDD.compute(MappedRDD.scala:12) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.MappedRDD.compute(MappedRDD.scala:12) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.scheduler.ResultTask.run(ResultTask.scala:77) at spark.executor.Executor$TaskRunner.run(Executor.scala:100) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Any ideas? -Chris
Re: DataFrame RDDs
Maybe I'm wrong, but this use case could be a good fit for Shapelesshttps://github.com/milessabin/shapeless' records. Shapeless' records are like, so to say, lisp's record but typed! In that sense, they're more closer to Haskell's record notation, but imho less powerful, since the access will be based on String (field name) for Shapeless where Haskell will use pure functions! Anyway, this documentationhttps://github.com/milessabin/shapeless/wiki/Feature-overview%3a-shapeless-2.0.0#extensible-records is self-explanatory and straightforward how we (maybe) could use them to simulate an R's frame Thinking out loud: when reading a csv file, for instance, what would be needed are * a Read[T] for each column, * fold'ling the list of columns by reading each and prepending the result (combined with the name with -) to an HList The gain would be that we should recover one helpful feature of R's frame which is: R :: frame$newCol = frame$post - frame$pre // which adds a column to a frame Shpls :: frame2 = frame + (newCol -- (frame(post) - frame(pre))) // type safe difference between ints for instance Of course, we're not recovering R's frame as is, because we're simply dealing with rows on by one, where a frame is dealing with the full table -- but in the case of Spark this would have no sense to mimic that, since we use RDDs for that :-D. I didn't experimented this yet, but It'd be fun to try, don't know if someone is interested in ^^ Cheers andy On Fri, Nov 15, 2013 at 8:49 PM, Christopher Nguyen c...@adatao.com wrote: Sure, Shay. Let's connect offline. Sent while mobile. Pls excuse typos etc. On Nov 16, 2013 2:27 AM, Shay Seng s...@1618labs.com wrote: Nice, any possibility of sharing this code in advance? On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen c...@adatao.comwrote: Shay, we've done this at Adatao, specifically a big data frame in RDD representation and subsetting/projections/data mining/machine learning algorithms on that in-memory table structure. We're planning to harmonize that with the MLBase work in the near future. Just a matter of prioritization on limited resources. If there's enough interest we'll accelerate that. Sent while mobile. Pls excuse typos etc. On Nov 16, 2013 1:11 AM, Shay Seng s...@1618labs.com wrote: Hi, Is there some way to get R-style Data.Frame data structures into RDDs? I've been using RDD[Seq[]] but this is getting quite error-prone and the code gets pretty hard to read especially after a few joins, maps etc. Rather than access columns by index, I would prefer to access them by name. e.g. instead of writing: myrdd.map(l = Seq(l(0), l(1), l,(4), l(9)) I would prefer to write myrdd.map(l = DataFrame(l.id, l.entryTime, l.exitTime, l.cost)) Also joins are particularly irritating. Currently I have to first construct a pair: somePairRdd.join(myrdd.map(l= (l(1),l(2)), (l(0),l(1),l(2),l(3))) Now I have to unzip away the join-key and remap the values into a seq instead I would rather write someDataFrame.join(myrdd , l= l.entryTime l.exitTime) The question is this: (1) I started writing a DataFrameRDD class that kept track of the column names and column values, and some optional attributes common to the entire dataframe. However I got a little muddled when trying to figure out what happens when a dataframRDD is chained with other operations and get transformed to other types of RDDs. The Value part of the RDD is obvious, but I didn't know the best way to pass on the column and attribute portions of the DataFrame class. I googled around for some documentation on how to write RDDs, but only found a pptx slide presentation with very vague info. Is there a better source of info on how to write RDDs? (2) Even better than info on how to write RDDs, has anyone written an RDD that functions as a DataFrame? :-) tks shay
EC2 node submit jobs to separate Spark Cluster
Hi, I'm working with an infrastructure that already has its own web server set up on EC2. I would like to set up a separate spark cluster on EC2 with the scripts and have the web server submit jobs to this spark cluster. Is it possible to do this? I'm getting some errors running the spark shell from the spark shell on the web server: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I have heard that it's not possible for any local computer to connect to the spark cluster, but I was wondering if other EC2 nodes could have their firewalls configured to allow this. We don't want to deploy the web server on the master node of the spark cluster. Thanks, -Matt Cheah
Can not get the expected output when running the BroadcastTest example program.
Hi, all. I'm using spark-0.8.0-incubating. I tried the example BroadcastTest in local mode. ./run-example org.apache.spark.examples.BroadcastTest local 1 2/dev/null This works fine and get the result: Iteration 0 === 100 100 100 100 100 100 100 100 100 100 Iteration 1 === 100 100 100 100 100 100 100 100 100 100 But when I run this program in the cluster(standalone mode) with: ./run-example org.apache.spark.examples.BroadcastTest spark://172.16.1.39:7077 5 2/dev/null This output is as follows: Iteration 0 === Iteration 1 === I also tried command ./run-example org.apache.spark.examples.BroadcastTest spark://172.16.1.39:7077 5 but I did not find any error message. Hope someone can give me some advices. Thank you. The content of file etc/spark-env.sh is as follows: export SCALA_HOME=/usr/lib/scala-2.9.3 export SPARK_MASTER_IP=172.16.1.39 export SPARK_MASTER_WEBUI_PORT=8090 export SPARK_WORKER_WEBUI_PORT=8091 export SPARK_WORKER_MEMORY=2G #export SPARK_CLASSPATH=.:/home/spark-0.7.3/core/target/spark-core-assembly-0.7.3.jar:$SPACK_CLASSPATH export SPARK_CLASSPATH=.:/home/hadoop/spark-0.8.0-incubating/conf:/home/hadoop/spark-0.8.0-incubating/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.0.1.jar:/home/hadoop/hadoop-1.0.1/conf Sincerely Yang, Qiang
Re: debugging a Spark error
Have you looked a the Spark executor logs? They're usually located in the $SPARK_HOME/work/ directory. If you're running in a cluster, they'll be on the individual slave nodes. These should hopefully reveal more information. On Mon, Nov 18, 2013 at 3:42 PM, Chris Grier gr...@icsi.berkeley.eduwrote: Hi, I'm trying to figure out what the problem is with a job that we are running on Spark 0.7.3. When we write out via saveAsTextFile we get an exception that doesn't reveal much: 13/11/18 15:06:19 INFO cluster.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:849) at spark.storage.DiskStore.getBytes(DiskStore.scala:86) at spark.storage.DiskStore.getValues(DiskStore.scala:92) at spark.storage.BlockManager.getLocal(BlockManager.scala:284) at spark.storage.BlockFetcherIterator$$anonfun$ 13.apply(BlockManager.scala:1027) at spark.storage.BlockFetcherIterator$$anonfun$ 13.apply(BlockManager.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach( ArrayBuffer.scala:47) at spark.storage.BlockFetcherIterator.init( BlockManager.scala:1026) at spark.storage.BlockManager.getMultiple(BlockManager.scala:478) at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher. scala:51) at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher. scala:10) at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply( CoGroupedRDD.scala:127) at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply( CoGroupedRDD.scala:115) at scala.collection.IndexedSeqOptimized$class. foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38) at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:704) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:714) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.MappedRDD.compute(MappedRDD.scala:12) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.MappedRDD.compute(MappedRDD.scala:12) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.scheduler.ResultTask.run(ResultTask.scala:77) at spark.executor.Executor$TaskRunner.run(Executor.scala:100) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Any ideas? -Chris
Re: EC2 node submit jobs to separate Spark Cluster
The main issue with running a spark-shell locally is that it orchestrates the actual computation, so you want it to be close to the actual Worker nodes for latency reasons. Running a spark-shell on EC2 in the same region as the Spark cluster avoids this problem. The error you're seeing seems to indicate a different issue. Check the Master web UI (accessible on port 8080 at the master's IP address) to make sure that Workers are successfully registered and they have the expected amount of memory available to Spark. You can also check to see how much memory your spark-shell is trying to get per executor. A couple common problems are (1) an abandoned spark-shell is holding onto all of your cluster's resources or (2) you've manually configured your spark-shell to try to get more memory than your Workers have available. Both of these should be visible in the web UI. On Mon, Nov 18, 2013 at 5:00 PM, Matt Cheah mch...@palantir.com wrote: Hi, I'm working with an infrastructure that already has its own web server set up on EC2. I would like to set up a *separate* spark cluster on EC2 with the scripts and have the web server submit jobs to this spark cluster. Is it possible to do this? I'm getting some errors running the spark shell from the spark shell on the web server: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I have heard that it's not possible for any local computer to connect to the spark cluster, but I was wondering if other EC2 nodes could have their firewalls configured to allow this. We don't want to deploy the web server on the master node of the spark cluster. Thanks, -Matt Cheah
Re: Joining files
Was my question so dumb? Or, is this not a good use case for Spark? On Sun, Nov 17, 2013 at 11:41 PM, Something Something mailinglist...@gmail.com wrote: I am a newbie to both Spark Scala, but I've been working with Hadoop/Pig for quite some time. We've quite a few ETL processes running in production that use Pig, but now we're evaluating Spark to see if they would indeed run faster. A very common use case in our Pig script is joining a file containing Facts to a file containing Dimension data. The joins are of course, inner, left outer. I thought I would start simple. Let's say I've 2 files: 1) Students: student_id, course_id, score 2) Course: course_id, course_title We want to produce a file that contains: student_id, course_title, score (Note: This is a hypothetical case. The real files have millions of facts thousands of dimensions) Would something like this work? Note: I did say I am a newbie ;) val students = sc.textFile(./students.txt) val courses = sc.textFile(./courses.txt) val s = students.map(x = x.split(',')) val left = students.map(x = x.split(',')).map(y = (y(1), y)) val right = courses.map(x = x.split(',')).map(y = (y(0), y)) val joined = left.join(right) Any pointers in this regard would be greatly appreciated. Thanks.
Re: DataFrame RDDs
Interesting idea — in Scala you can also use the Dynamic type (http://hacking-scala.org/post/49051516694/introduction-to-type-dynamic) to allow dynamic properties. It has the same potential pitfalls as string names, but with nicer syntax. Matei On Nov 18, 2013, at 3:45 PM, andy petrella andy.petre...@gmail.com wrote: Maybe I'm wrong, but this use case could be a good fit for Shapeless' records. Shapeless' records are like, so to say, lisp's record but typed! In that sense, they're more closer to Haskell's record notation, but imho less powerful, since the access will be based on String (field name) for Shapeless where Haskell will use pure functions! Anyway, this documentation is self-explanatory and straightforward how we (maybe) could use them to simulate an R's frame Thinking out loud: when reading a csv file, for instance, what would be needed are * a Read[T] for each column, * fold'ling the list of columns by reading each and prepending the result (combined with the name with -) to an HList The gain would be that we should recover one helpful feature of R's frame which is: R :: frame$newCol = frame$post - frame$pre // which adds a column to a frame Shpls :: frame2 = frame + (newCol -- (frame(post) - frame(pre))) // type safe difference between ints for instance Of course, we're not recovering R's frame as is, because we're simply dealing with rows on by one, where a frame is dealing with the full table -- but in the case of Spark this would have no sense to mimic that, since we use RDDs for that :-D. I didn't experimented this yet, but It'd be fun to try, don't know if someone is interested in ^^ Cheers andy On Fri, Nov 15, 2013 at 8:49 PM, Christopher Nguyen c...@adatao.com wrote: Sure, Shay. Let's connect offline. Sent while mobile. Pls excuse typos etc. On Nov 16, 2013 2:27 AM, Shay Seng s...@1618labs.com wrote: Nice, any possibility of sharing this code in advance? On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen c...@adatao.com wrote: Shay, we've done this at Adatao, specifically a big data frame in RDD representation and subsetting/projections/data mining/machine learning algorithms on that in-memory table structure. We're planning to harmonize that with the MLBase work in the near future. Just a matter of prioritization on limited resources. If there's enough interest we'll accelerate that. Sent while mobile. Pls excuse typos etc. On Nov 16, 2013 1:11 AM, Shay Seng s...@1618labs.com wrote: Hi, Is there some way to get R-style Data.Frame data structures into RDDs? I've been using RDD[Seq[]] but this is getting quite error-prone and the code gets pretty hard to read especially after a few joins, maps etc. Rather than access columns by index, I would prefer to access them by name. e.g. instead of writing: myrdd.map(l = Seq(l(0), l(1), l,(4), l(9)) I would prefer to write myrdd.map(l = DataFrame(l.id, l.entryTime, l.exitTime, l.cost)) Also joins are particularly irritating. Currently I have to first construct a pair: somePairRdd.join(myrdd.map(l= (l(1),l(2)), (l(0),l(1),l(2),l(3))) Now I have to unzip away the join-key and remap the values into a seq instead I would rather write someDataFrame.join(myrdd , l= l.entryTime l.exitTime) The question is this: (1) I started writing a DataFrameRDD class that kept track of the column names and column values, and some optional attributes common to the entire dataframe. However I got a little muddled when trying to figure out what happens when a dataframRDD is chained with other operations and get transformed to other types of RDDs. The Value part of the RDD is obvious, but I didn't know the best way to pass on the column and attribute portions of the DataFrame class. I googled around for some documentation on how to write RDDs, but only found a pptx slide presentation with very vague info. Is there a better source of info on how to write RDDs? (2) Even better than info on how to write RDDs, has anyone written an RDD that functions as a DataFrame? :-) tks shay