Re: Question regarding doing aggregation over custom partitions
Thanks, that was what I was missing! arun arun *__* *Arun Swami* +1 408-338-0906 On Fri, May 2, 2014 at 4:28 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: You need to first partition the data by the key Use mappartition instead of map. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, May 2, 2014 at 5:33 AM, Arun Swami a...@caspida.com wrote: Hi, I am a newbie to Spark. I looked for documentation or examples to answer my question but came up empty handed. I don't know whether I am using the right terminology but here goes. I have a file of records. Initially, I had the following Spark program (I am omitting all the surrounding code and focusing only on the Spark related code): ... val recordsRDD = sc.textFile(pathSpec, 2).cache val countsRDD: RDD[(String, Int)] = recordsRDD.flatMap(x = getCombinations(x)) .map(e = (e, 1)) .reduceByKey(_ + _) ... Here getCombinations() is a function I have written that takes a record and returns List[String]. This program works as expected. Now, I want to do the following. I want to partition the records in recordsRDD by some key extracted from each record. I do this as follows: val keyValueRecordsRDD: RDD[(String, String)] = recodsRDD.flatMap(getKeyValueRecord(_)) Here getKeyValueRecord() is a function I have written that takes a record and returns a Tuple2 of a key and the original record. Now I want to do the same operations as before (getCombinations(), and count occurrences) BUT on each partition as defined by the key. Essentially, I want to apply the operations individually in each partition. In a separate step, I want to recover the global counts across all partitions while keeping the partition based counts. How can I do this in Spark? Thanks! arun *__* *Arun Swami*
Re: string to int conversion
On Sat, May 3, 2014 at 2:00 AM, SK skrishna...@gmail.com wrote: 1) I have a csv file where one of the field has integer data but it appears as strings: 1, 3 etc. I tried using toInt to implcitly convert the strings to int after reading (field(3).toInt). But I got a NumberFormatException. So I defined my own conversion as follows, but I still get a NumberFormatException - the toInt function on StringOps is failing. Any idea, how I can convert strings to int? toInt certainly works, and you need not write your own conversion, but, a string with double-quotes is not a valid number. To be safest, you might write an unquote function that strips quotes if they exist, rather than just remove all quotes in the string. (Or see above using a Java library that specializes in handling all the details of CSV, including quoted and escaped fields -- I liked SuperCSV.)
Re: sbt/sbt run command returns a JVM problem
Hi, thanks for all your help. I tried your setting in the sbt file, but the problem is still there. The Java setting in my sbt file is: java \ -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \ -jar ${JAR} \ $@ I have tried to set these 3 parameters bigger and smaller, but nothing works. Did I change the right thing? Thank you very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5267.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark's behavior
Hi TD, Thanks for reply This last experiment I did with one computer, like local, but I think that time gap grow up when I add more computer. I will do again now with 8 worker and 1 word source and I will see what’s go on. I will control the time too, like suggested by Andrew. On May 3, 2014, at 1:19, Tathagata Das tathagata.das1...@gmail.com wrote: From the logs, I see that the print() starts printing stuff 10 seconds after the context is started. And that 10 seconds is taken by the initial empty job (50 map + 20 reduce tasks) that spark streaming starts to ensure all the executors have started. Somehow the first empty task takes 7-8 seconds to complete. See if this can be reproduced by running a simple, empty job in spark shell (in the same cluster) and see if the first task takes 7-8 seconds. Either way, I didnt see the 30 second gap, but a 10 second gap. And that does not seem to be a persistent problem as after that 10 seconds, the data is being received and processed. TD On Fri, May 2, 2014 at 2:14 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, I got the another information today using Spark 1.0 RC3 and the situation remain the same: PastedGraphic-1.png The lines begin after 17 sec: 14/05/02 21:52:25 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140502215225-0005/0 on hostPort computer8.ant-net:57229 with 2 cores, 2.0 GB RAM 14/05/02 21:52:25 INFO AppClient$ClientActor: Executor updated: app-20140502215225-0005/0 is now RUNNING 14/05/02 21:52:25 INFO ReceiverTracker: ReceiverTracker started 14/05/02 21:52:26 INFO ForEachDStream: metadataCleanupDelay = -1 14/05/02 21:52:26 INFO SocketInputDStream: metadataCleanupDelay = -1 14/05/02 21:52:26 INFO SocketInputDStream: Slide time = 1000 ms 14/05/02 21:52:26 INFO SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 14/05/02 21:52:26 INFO SocketInputDStream: Checkpoint interval = null 14/05/02 21:52:26 INFO SocketInputDStream: Remember duration = 1000 ms 14/05/02 21:52:26 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5433868e 14/05/02 21:52:26 INFO ForEachDStream: Slide time = 1000 ms 14/05/02 21:52:26 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 14/05/02 21:52:26 INFO ForEachDStream: Checkpoint interval = null 14/05/02 21:52:26 INFO ForEachDStream: Remember duration = 1000 ms 14/05/02 21:52:26 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1ebdbc05 14/05/02 21:52:26 INFO SparkContext: Starting job: collect at ReceiverTracker.scala:270 14/05/02 21:52:26 INFO RecurringTimer: Started timer for JobGenerator at time 1399060346000 14/05/02 21:52:26 INFO JobGenerator: Started JobGenerator at 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Started JobScheduler 14/05/02 21:52:26 INFO DAGScheduler: Registering RDD 3 (reduceByKey at ReceiverTracker.scala:270) 14/05/02 21:52:26 INFO ReceiverTracker: Stream 0 received 0 blocks 14/05/02 21:52:26 INFO DAGScheduler: Got job 0 (collect at ReceiverTracker.scala:270) with 20 output partitions (allowLocal=false) 14/05/02 21:52:26 INFO DAGScheduler: Final stage: Stage 0(collect at ReceiverTracker.scala:270) 14/05/02 21:52:26 INFO DAGScheduler: Parents of final stage: List(Stage 1) 14/05/02 21:52:26 INFO JobScheduler: Added jobs for time 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Starting job streaming job 1399060346000 ms.0 from job set of time 1399060346000 ms 14/05/02 21:52:26 INFO JobGenerator: Checkpointing graph for time 1399060346000 ms ---14/05/02 21:52:26 INFO DStreamGraph: Updating checkpoint data for time 1399060346000 ms Time: 1399060346000 ms --- 14/05/02 21:52:26 INFO JobScheduler: Finished job streaming job 1399060346000 ms.0 from job set of time 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Total delay: 0.325 s for time 1399060346000 ms (execution: 0.024 s) 14/05/02 21:52:42 INFO JobScheduler: Added jobs for time 1399060362000 ms 14/05/02 21:52:42 INFO JobGenerator: Checkpointing graph for time 1399060362000 ms 14/05/02 21:52:42 INFO DStreamGraph: Updating checkpoint data for time 1399060362000 ms 14/05/02 21:52:42 INFO DStreamGraph: Updated checkpoint data for time 1399060362000 ms 14/05/02 21:52:42 INFO JobScheduler: Starting job streaming job 1399060362000 ms.0 from job set of time 1399060362000 ms 14/05/02 21:52:42 INFO SparkContext: Starting job: take at DStream.scala:593 14/05/02 21:52:42 INFO DAGScheduler: Got job 2 (take at DStream.scala:593) with 1 output partitions (allowLocal=true) 14/05/02 21:52:42 INFO DAGScheduler: Final stage: Stage 3(take at DStream.scala:593) 14/05/02 21:52:42 INFO DAGScheduler: Parents of final stage: List() 14/05/02 21:52:42 INFO
what's local[n]
in spark kafka example, it says `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1` can any one tell me what does local[2] represent ? i thought master url should be sth like spark://hostname:portname . also, the thread count is specified as 1 in kafka example, what will happen thread count goes to more than 1 ? does that mean multiple kafka consumer will be created on different spark workers ? I'm not sure how does the code mapped to realtime worker thread allocation ? Is there any documentation on that ? Weide
Re: Crazy Kryo Exception
Poking around in the bowels of scala, it seems like this has something to do with implicit scala - java collection munging. Why would it be doing this and where? The stack trace given is entirely unhelpful to me. Is there a better one buried in my task logs? None of my tasks actually failed, so it seems that it dying while trying to fetch results from my tasks to return back to the driver. Am I close? On Fri, May 2, 2014 at 3:35 PM, Soren Macbeth so...@yieldbot.com wrote: Hallo, I've getting this rather crazy kryo exception trying to run my spark job: Exception in thread main org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set final scala.collection.convert.Wrappers field scala.collection.convert.Wrappers$SeqWrapper.$outer to my.custom.class Serialization trace: $outer (scala.collection.convert.Wrappers$SeqWrapper) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I have a kryo serializer for my.custom.class and I've registered it using a custom registrator on my context object. I've tested the custom serializer and the registrator locally and they both function as expected. This job is running spark 0.9.1 under mesos in fine grained mode. Please help!
Re: what's local[n]
Hi Weide, The answer to your first question about local[2] can be found in the Running the Examples and Shell section of https://spark.apache.org/docs/latest/ Note that all of the sample programs take a master parameter specifying the cluster URL to connect to. This can be a URL for a distributed clusterhttps://spark.apache.org/docs/latest/scala-programming-guide.html#master-urls, or local to run locally with one thread, or local[N] to run locally with N threads. You should start by using local for testing. For the second, I'm not sure how the thread count affects the number of kafka consumers. My first guess would be that a thread is 1:1 with a kafka consumer, but I'm not super familiar with Spark streaming. Andrew On Sat, May 3, 2014 at 7:00 AM, Weide Zhang weo...@gmail.com wrote: in spark kafka example, it says `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1` can any one tell me what does local[2] represent ? i thought master url should be sth like spark://hostname:portname . also, the thread count is specified as 1 in kafka example, what will happen thread count goes to more than 1 ? does that mean multiple kafka consumer will be created on different spark workers ? I'm not sure how does the code mapped to realtime worker thread allocation ? Is there any documentation on that ? Weide
Lease Exception hadoop 2.4
Hello, I am getting this warning after upgrading Hadoop 2.4, when I try to write something to the HDFS. The content is written correctly, but I do not like this warning. DO I have to compile SPARK with hadoop 2.4? WARN TaskSetManager: Loss was due to org.apache.hadoop.ipc.RemoteException org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException) thanks
Spark-1.0.0-rc3 compiled against Hadoop 2.3.0 cannot read HDFS 2.3.0?
Hi, all I compiled the Spark-1.0.0-rc3 against Hadoop 2.3.0 with SPARK_HADOOP_VERSION=2.3.0 sbt/sbt assembly and deploy the spark with HDFS 2.3.0 (yarn = false), it seems that it cannot read data from HDFS the following exception is thrown java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$CreateSnapshotRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2531) at java.lang.Class.privateGetPublicMethods(Class.java:2651) at java.lang.Class.privateGetPublicMethods(Class.java:2661) at java.lang.Class.getMethods(Class.java:1467) at sun.misc.ProxyGenerator.generateClassFile(ProxyGenerator.java:426) at sun.misc.ProxyGenerator.generateProxyClass(ProxyGenerator.java:323) at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:636) at java.lang.reflect.Proxy.newProxyInstance(Proxy.java:722) at org.apache.hadoop.ipc.ProtobufRpcEngine.getProxy(ProtobufRpcEngine.java:92) at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:537) at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:334) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:241) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2350) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2332) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:369) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1085) at org.apache.spark.rdd.RDD.count(RDD.scala:836) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at
Re: sbt/sbt run command returns a JVM problem
The problem is probably not with the JVM running sbt but with the one that sbt is forking to run your program. See here for the relevant option: https://github.com/apache/spark/blob/master/project/SparkBuild.scala#L186 You might try starting sbt with no arguments (to bring up the sbt console). You can then set javaOptions += -Xmx1G and afterwards try run. Michael On Sat, May 3, 2014 at 5:15 AM, Carter gyz...@hotmail.com wrote: Hi, thanks for all your help. I tried your setting in the sbt file, but the problem is still there. The Java setting in my sbt file is: java \ -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \ -jar ${JAR} \ $@ I have tried to set these 3 parameters bigger and smaller, but nothing works. Did I change the right thing? Thank you very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5267.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Multiple Streams with Spark Streaming
if you want to use true Spark Streaming (not the same as Hadoop Streaming/Piping, as Mayur pointed out), you can use the DStream.union() method as described in the following docs: http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html our friend, diana carroll, from cloudera recently posted a nice little utility for sending files to a Spark Streaming Receiver to simulate a streaming scenario from disk. here's the link to her post: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-code-to-simulate-a-network-socket-data-source-tc3431.html -chris On Thu, May 1, 2014 at 3:09 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: File as a stream? I think you are confusing Spark Streaming with buffer reader. Spark streaming is meant to process batches of data (files, packets, messages) as they come in, infact utilizing time of packet reception as a way to create windows etc. In your case you are better off reading the file, partitioning it operating on each column individually if that makes more sense to you. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, May 1, 2014 at 3:24 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi all, Is it possible to read and process multiple streams with spark. I have eeg(brain waves) csv file with 23 columns Each column is one stream(wave) and each column has one million values. I know one way to do it is to take transpose of the file and then give it to spark and each mapper will get one or more waves out of the 23 waves, but then it will be non-streaming problem and I want to read the file as stream. Please correct me if I am wrong. I have to apply simple operations(mean and SD) on each window of a wave. Regards, Laeeq
Re: Reading multiple S3 objects, transforming, writing back one
not sure if this directly addresses your issue, peter, but it's worth mentioned a handy AWS EMR utility called s3distcp that can upload a single HDFS file - in parallel - to a single, concatenated S3 file once all the partitions are uploaded. kinda cool. here's some info: http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html s3distcp is an extension of the familiar hadoop distcp, of course. On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: The fastest way to save to S3 should be to leave the RDD with many partitions, because all partitions will be written out in parallel. Then, once the various parts are in S3, somehow concatenate the files together into one file. If this can be done within S3 (I don't know if this is possible), then you get the best of both worlds: a highly parallelized write to S3, and a single cleanly named output file. On Thu, May 1, 2014 at 12:52 PM, Peter thenephili...@yahoo.com wrote: Thank you Patrick. I took a quick stab at it: val s3Client = new AmazonS3Client(...) val copyObjectResult = s3Client.copyObject(upload, outputPrefix + /part-0, rolled-up-logs, 2014-04-28.csv) val objectListing = s3Client.listObjects(upload, outputPrefix) s3Client.deleteObjects(new DeleteObjectsRequest(upload).withKeys(objectListing.getObjectSummaries.asScala.map(s = new KeyVersion(s.getKey)).asJava)) Using a 3GB object I achieved about 33MB/s between buckets in the same AZ. This is a workable solution for the short term but not ideal for the longer term as data size increases. I understand it's a limitation of the Hadoop API but ultimately it must be possible to dump a RDD to a single S3 object :) On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell pwend...@gmail.com wrote: This is a consequence of the way the Hadoop files API works. However, you can (fairly easily) add code to just rename the file because it will always produce the same filename. (heavy use of pseudo code) dir = /some/dir rdd.coalesce(1).saveAsTextFile(dir) f = new File(dir + part-0) f.moveTo(somewhere else) dir.remove() It might be cool to add a utility called `saveAsSingleFile` or something that does this for you. In fact probably we should have called saveAsTextfile saveAsTextFiles to make it more clear... On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote: Thanks Nicholas, this is a bit of a shame, not very practical for log roll up for example when every output needs to be in it's own directory. On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yes, saveAsTextFile() will give you 1 part per RDD partition. When you coalesce(1), you move everything in the RDD to a single partition, which then gives you 1 output file. It will still be called part-0 or something like that because that's defined by the Hadoop API that Spark uses for reading to/writing from S3. I don't know of a way to change that. On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote: Ah, looks like RDD.coalesce(1) solves one part of the problem. On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com wrote: Hi Playing around with Spark S3, I'm opening multiple objects (CSV files) with: val hfile = sc.textFile(s3n://bucket/2014-04-28/) so hfile is a RDD representing 10 objects that were underneath 2014-04-28. After I've sorted and otherwise transformed the content, I'm trying to write it back to a single object: sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv) unfortunately this results in a folder named concatted.csv with 10 objects underneath, part-0 .. part-00010, corresponding to the 10 original objects loaded. How can I achieve the desired behaviour of putting a single object named concatted.csv ? I've tried 0.9.1 and 1.0.0-RC3. Thanks! Peter
Re: Reading multiple S3 objects, transforming, writing back one
Hi Peter, If your dataset is large (3GB) then why not just chunk it into multiple files? You'll need to do this anyways if you want to read/write this from S3 quickly, because S3's throughput per file is limited (as you've seen). This is exactly why the Hadoop API lets you save datasets with many partitions, since often there are bottlenecks at the granularity of a file. Is there a reason you need this to be exactly one file? - Patrick On Sat, May 3, 2014 at 4:14 PM, Chris Fregly ch...@fregly.com wrote: not sure if this directly addresses your issue, peter, but it's worth mentioned a handy AWS EMR utility called s3distcp that can upload a single HDFS file - in parallel - to a single, concatenated S3 file once all the partitions are uploaded. kinda cool. here's some info: http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html s3distcp is an extension of the familiar hadoop distcp, of course. On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: The fastest way to save to S3 should be to leave the RDD with many partitions, because all partitions will be written out in parallel. Then, once the various parts are in S3, somehow concatenate the files together into one file. If this can be done within S3 (I don't know if this is possible), then you get the best of both worlds: a highly parallelized write to S3, and a single cleanly named output file. On Thu, May 1, 2014 at 12:52 PM, Peter thenephili...@yahoo.com wrote: Thank you Patrick. I took a quick stab at it: val s3Client = new AmazonS3Client(...) val copyObjectResult = s3Client.copyObject(upload, outputPrefix + /part-0, rolled-up-logs, 2014-04-28.csv) val objectListing = s3Client.listObjects(upload, outputPrefix) s3Client.deleteObjects(new DeleteObjectsRequest(upload).withKeys(objectListing.getObjectSummaries.asScala.map(s = new KeyVersion(s.getKey)).asJava)) Using a 3GB object I achieved about 33MB/s between buckets in the same AZ. This is a workable solution for the short term but not ideal for the longer term as data size increases. I understand it's a limitation of the Hadoop API but ultimately it must be possible to dump a RDD to a single S3 object :) On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell pwend...@gmail.com wrote: This is a consequence of the way the Hadoop files API works. However, you can (fairly easily) add code to just rename the file because it will always produce the same filename. (heavy use of pseudo code) dir = /some/dir rdd.coalesce(1).saveAsTextFile(dir) f = new File(dir + part-0) f.moveTo(somewhere else) dir.remove() It might be cool to add a utility called `saveAsSingleFile` or something that does this for you. In fact probably we should have called saveAsTextfile saveAsTextFiles to make it more clear... On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote: Thanks Nicholas, this is a bit of a shame, not very practical for log roll up for example when every output needs to be in it's own directory. On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yes, saveAsTextFile() will give you 1 part per RDD partition. When you coalesce(1), you move everything in the RDD to a single partition, which then gives you 1 output file. It will still be called part-0 or something like that because that's defined by the Hadoop API that Spark uses for reading to/writing from S3. I don't know of a way to change that. On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote: Ah, looks like RDD.coalesce(1) solves one part of the problem. On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com wrote: Hi Playing around with Spark S3, I'm opening multiple objects (CSV files) with: val hfile = sc.textFile(s3n://bucket/2014-04-28/) so hfile is a RDD representing 10 objects that were underneath 2014-04-28. After I've sorted and otherwise transformed the content, I'm trying to write it back to a single object: sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv) unfortunately this results in a folder named concatted.csv with 10 objects underneath, part-0 .. part-00010, corresponding to the 10 original objects loaded. How can I achieve the desired behaviour of putting a single object named concatted.csv ? I've tried 0.9.1 and 1.0.0-RC3. Thanks! Peter
Re: Setting the Scala version in the EC2 script?
Spark will only work with Scala 2.10... are you trying to do a minor version upgrade or upgrade to a totally different version? You could do this as follows if you want: 1. Fork the spark-ec2 repository and change the file here: https://github.com/mesos/spark-ec2/blob/v2/scala/init.sh 2. Modify your spark-ec2.py script to checkout spark-ec2 from forked version. - Patrick On Thu, May 1, 2014 at 2:14 PM, Ian Ferreira ianferre...@hotmail.com wrote: Is this possible, it is very annoying to have such a great script, but still have to manually update stuff afterwards.
Re: when to use broadcast variables
Broadcast variables need to fit entirely in memory - so that's a pretty good litmus test for whether or not to broadcast a smaller dataset or turn it into an RDD. On Fri, May 2, 2014 at 7:50 AM, Prashant Sharma scrapco...@gmail.com wrote: I had like to be corrected on this but I am just trying to say small enough of the order of few 100 MBs. Imagine the size gets shipped to all nodes, it can be a GB but not GBs and then depends on the network too. Prashant Sharma On Fri, May 2, 2014 at 6:42 PM, Diana Carroll dcarr...@cloudera.com wrote: Anyone have any guidance on using a broadcast variable to ship data to workers vs. an RDD? Like, say I'm joining web logs in an RDD with user account data. I could keep the account data in an RDD or if it's small, a broadcast variable instead. How small is small? Small enough that I know it can easily fit in memory on a single node? Some other guideline? Thanks! Diana
Re: performance improvement on second operation...without caching?
Hi Diana, Apart from these reasons, in a multi-stage job, Spark saves the map output files from map stages to the filesystem, so it only needs to rerun the last reduce stage. This is why you only saw one stage executing. These files are saved for fault recovery but they speed up subsequent runs. Matei On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote: Ethan, What you said is actually not true, Spark won't cache RDD's unless you ask it to. The observation here - that running the same job can speed up substantially even without caching - is common. This is because other components in the stack are performing caching and optimizations. Two that can make a huge difference are: 1. The OS buffer cache. Which will keep recently read disk blocks in memory. 2. The Java just-in-time compiler (JIT) which will use runtime profiling to significantly speed up execution speed. These can make a huge difference if you are running the same job over-and-over. And there are other things like the OS network stack increasing TCP windows and so fourth. These will all improve response time as a spark program executes. On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote: I believe Spark caches RDDs it has memory for regardless of whether you actually call the 'cache' method on the RDD. The 'cache' method just tips off Spark that the RDD should have higher priority. At least, that is my experience and it seems to correspond with your experience and with my recollection of other discussions on this topic on the list. However, going back and looking at the programming guide, this is not the way the cache/persist behavior is described. Does the guide need to be updated? On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm just Posty McPostalot this week, sorry folks! :-) Anyway, another question today: I have a bit of code that is pretty time consuming (pasted at the end of the message): It reads in a bunch of XML files, parses them, extracts some data in a map, counts (using reduce), and then sorts. All stages are executed when I do a final operation (take). The first stage is the most expensive: on first run it takes 30s to a minute. I'm not caching anything. When I re-execute that take at the end, I expected it to re-execute all the same stages, and take approximately the same amount of time, but it didn't. The second take executes only a single stage which collectively run very fast: the whole operation takes less than 1 second (down from 5 minutes!) While this is awesome (!) I don't understand it. If I'm not caching data, why would I see such a marked performance improvement on subsequent execution? (or is this related to the known .9.1 bug about sortByKey executing an action when it shouldn't?) Thanks, Diana sparkdev_04-23_KEEP_FOR_BUILDS.png # load XML files containing device activation records. # Find the most common device models activated import xml.etree.ElementTree as ElementTree # Given a partition containing multi-line XML, parse the contents. # Return an iterator of activation Elements contained in the partition def getactivations(fileiterator): s = '' for i in fileiterator: s = s + str(i) filetree = ElementTree.fromstring(s) return filetree.getiterator('activation') # Get the model name from a device activation record def getmodel(activation): return activation.find('model').text filename=hdfs://localhost/user/training/activations/*.xml # parse each partition as a file into an activation XML record activations = sc.textFile(filename) activationTrees = activations.mapPartitions(lambda xml: getactivations(xml)) models = activationTrees.map(lambda activation: getmodel(activation)) # count and sort activations by model topmodels = models.map(lambda model: (model,1))\ .reduceByKey(lambda v1,v2: v1+v2)\ .map(lambda (model,count): (count,model))\ .sortByKey(ascending=False) # display the top 10 models for (count,model) in topmodels.take(10): print Model %s (%s) % (model,count) # repeat! for (count,model) in topmodels.take(10): print Model %s (%s) % (model,count)
Re: Spark's behavior
Hi Eduardo, Yep those machines look pretty well synchronized at this point. Just wanted to throw that out there and eliminate it as a possible source of confusion. Good luck on continuing the debugging! Andrew On Sat, May 3, 2014 at 11:59 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, I did a test with 8 workers and 1 word source, the time gap was 27 sec, how can see in the log files(in attach). Hi Andrew, I configured the ntp, all machines are synchronized. root@computer8:/opt/unibs_test/spark-1.0RC3# for num in {1,8,10,11,13,15,16,18,22}; do ssh computer$num date; done Sat May 3 20:57:41 CEST 2014 Sat May 3 20:57:41 CEST 2014 Sat May 3 20:57:41 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Informativa sulla Privacy: http://www.unibs.it/node/8155 On May 3, 2014, at 15:46, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, Thanks for reply This last experiment I did with one computer, like local, but I think that time gap grow up when I add more computer. I will do again now with 8 worker and 1 word source and I will see what’s go on. I will control the time too, like suggested by Andrew. On May 3, 2014, at 1:19, Tathagata Das tathagata.das1...@gmail.com wrote: From the logs, I see that the print() starts printing stuff 10 seconds after the context is started. And that 10 seconds is taken by the initial empty job (50 map + 20 reduce tasks) that spark streaming starts to ensure all the executors have started. Somehow the first empty task takes 7-8 seconds to complete. See if this can be reproduced by running a simple, empty job in spark shell (in the same cluster) and see if the first task takes 7-8 seconds. Either way, I didnt see the 30 second gap, but a 10 second gap. And that does not seem to be a persistent problem as after that 10 seconds, the data is being received and processed. TD On Fri, May 2, 2014 at 2:14 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, I got the another information today using Spark 1.0 RC3 and the situation remain the same: PastedGraphic-1.png The lines begin after 17 sec: 14/05/02 21:52:25 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140502215225-0005/0 on hostPort computer8.ant-net:57229 with 2 cores, 2.0 GB RAM 14/05/02 21:52:25 INFO AppClient$ClientActor: Executor updated: app-20140502215225-0005/0 is now RUNNING 14/05/02 21:52:25 INFO ReceiverTracker: ReceiverTracker started 14/05/02 21:52:26 INFO ForEachDStream: metadataCleanupDelay = -1 14/05/02 21:52:26 INFO SocketInputDStream: metadataCleanupDelay = -1 14/05/02 21:52:26 INFO SocketInputDStream: Slide time = 1000 ms 14/05/02 21:52:26 INFO SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 14/05/02 21:52:26 INFO SocketInputDStream: Checkpoint interval = null 14/05/02 21:52:26 INFO SocketInputDStream: Remember duration = 1000 ms 14/05/02 21:52:26 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5433868e 14/05/02 21:52:26 INFO ForEachDStream: Slide time = 1000 ms 14/05/02 21:52:26 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 14/05/02 21:52:26 INFO ForEachDStream: Checkpoint interval = null 14/05/02 21:52:26 INFO ForEachDStream: Remember duration = 1000 ms 14/05/02 21:52:26 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1ebdbc05 14/05/02 21:52:26 INFO SparkContext: Starting job: collect at ReceiverTracker.scala:270 14/05/02 21:52:26 INFO RecurringTimer: Started timer for JobGenerator at time 1399060346000 14/05/02 21:52:26 INFO JobGenerator: Started JobGenerator at 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Started JobScheduler 14/05/02 21:52:26 INFO DAGScheduler: Registering RDD 3 (reduceByKey at ReceiverTracker.scala:270) 14/05/02 21:52:26 INFO ReceiverTracker: Stream 0 received 0 blocks 14/05/02 21:52:26 INFO DAGScheduler: Got job 0 (collect at ReceiverTracker.scala:270) with 20 output partitions (allowLocal=false) 14/05/02 21:52:26 INFO DAGScheduler: Final stage: Stage 0(collect at ReceiverTracker.scala:270) 14/05/02 21:52:26 INFO DAGScheduler: Parents of final stage: List(Stage 1) 14/05/02 21:52:26 INFO JobScheduler: Added jobs for time 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Starting job streaming job 1399060346000 ms.0 from job set of time 1399060346000 ms 14/05/02 21:52:26 INFO JobGenerator: Checkpointing graph for time 1399060346000 ms ---14/05/02 21:52:26 INFO DStreamGraph: Updating checkpoint data for time 1399060346000 ms Time: 1399060346000 ms --- 14/05/02 21:52:26 INFO JobScheduler: Finished job streaming job 1399060346000
Re: help me
as Mayur indicated, it's odd that you are seeing better performance from a less-local configuration. however, the non-deterministic behavior that you describe is likely caused by GC pauses in your JVM process. take note of the *spark.locality.wait* configuration parameter described here: http://spark.apache.org/docs/latest/configuration.html this is the amount of time the Spark execution engine waits before launching a new task on a less-data-local node (ie. process - node - rack). by default, this is 3 seconds. if there is excessive GC occurring on the original process-local JVM, it is possible that another node-local JVM process could actually load the data from HDFS (on the same node) and complete the processing before the original process's GC finishes. you could bump up the *spark.locality.wait* default (not recommended) or increase your number of nodes/partitions to increase parallelism and reduce hotspots. also, keep an eye on your GC characteristics. perhaps you need to increase your Eden size to reduce the amount of movement through the GC generations and reduce major compactions. (the usual GC tuning fun.) curious if others have experienced this behavior, as well? -chris On Fri, May 2, 2014 at 6:07 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: Spark would be much faster on process_local instead of node_local. Node_local references data from local harddisk, process_local references data from in-memory thread. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Apr 22, 2014 at 4:45 PM, Joe L selme...@yahoo.com wrote: I got the following performance is it normal in spark to be like this. some times spark switchs into node_local mode from process_local and it becomes 10x faster. I am very confused. scala val a = sc.textFile(/user/exobrain/batselem/LUBM1000) scala f.count() Long = 137805557 took 130.809661618 s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/help-me-tp4598.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark run issue
Hi I'm trying to run the kafka-word-count example in spark2.9.1. I encountered some exception when initialize kafka consumer/producer config. I'm using scala 2.10.3 and used maven build inside spark streaming kafka library comes with spark2.9.1. Any one see this exception before? Thanks, producer: [java] The args attribute is deprecated. Please use nested arg elements. [java] Exception in thread main java.lang.NoClassDefFoundError: scala/Tuple2$mcLL$sp [java] at kafka.producer.ProducerConfig.init(ProducerConfig.scala:56) [java] at com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89) [java] at com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala) [java] Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp [java] at java.net.URLClassLoader$1.run(URLClassLoader.java:202) [java] at java.security.AccessController.doPrivileged(Native Method) [java] at java.net.URLClassLoader.findClass(URLClassLoader.java:190) [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:306) [java] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:247) [java] ... 3 more [java] Java Result: 1
Re: spark run issue
Hi Tathagata, I figured out the reason. I was adding a wrong kafka lib along side with the version spark uses. Sorry for spamming. Weide On Sat, May 3, 2014 at 7:04 PM, Tathagata Das tathagata.das1...@gmail.comwrote: I am a little confused about the version of Spark you are using. Are you using Spark 0.9.1 that uses scala 2.10.3 ? TD On Sat, May 3, 2014 at 6:16 PM, Weide Zhang weo...@gmail.com wrote: Hi I'm trying to run the kafka-word-count example in spark2.9.1. I encountered some exception when initialize kafka consumer/producer config. I'm using scala 2.10.3 and used maven build inside spark streaming kafka library comes with spark2.9.1. Any one see this exception before? Thanks, producer: [java] The args attribute is deprecated. Please use nested arg elements. [java] Exception in thread main java.lang.NoClassDefFoundError: scala/Tuple2$mcLL$sp [java] at kafka.producer.ProducerConfig.init(ProducerConfig.scala:56) [java] at com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89) [java] at com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala) [java] Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp [java] at java.net.URLClassLoader$1.run(URLClassLoader.java:202) [java] at java.security.AccessController.doPrivileged(Native Method) [java] at java.net.URLClassLoader.findClass(URLClassLoader.java:190) [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:306) [java] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:247) [java] ... 3 more [java] Java Result: 1
Re: spark run issue
Hi Tathagata, I actually have a separate question. What's the usage of lib_managed folder inside spark source folder ? Are those the library required for spark streaming to run ? Do they needed to be added to spark classpath when starting sparking cluster? Weide On Sat, May 3, 2014 at 7:08 PM, Weide Zhang weo...@gmail.com wrote: Hi Tathagata, I figured out the reason. I was adding a wrong kafka lib along side with the version spark uses. Sorry for spamming. Weide On Sat, May 3, 2014 at 7:04 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am a little confused about the version of Spark you are using. Are you using Spark 0.9.1 that uses scala 2.10.3 ? TD On Sat, May 3, 2014 at 6:16 PM, Weide Zhang weo...@gmail.com wrote: Hi I'm trying to run the kafka-word-count example in spark2.9.1. I encountered some exception when initialize kafka consumer/producer config. I'm using scala 2.10.3 and used maven build inside spark streaming kafka library comes with spark2.9.1. Any one see this exception before? Thanks, producer: [java] The args attribute is deprecated. Please use nested arg elements. [java] Exception in thread main java.lang.NoClassDefFoundError: scala/Tuple2$mcLL$sp [java] at kafka.producer.ProducerConfig.init(ProducerConfig.scala:56) [java] at com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89) [java] at com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala) [java] Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp [java] at java.net.URLClassLoader$1.run(URLClassLoader.java:202) [java] at java.security.AccessController.doPrivileged(Native Method) [java] at java.net.URLClassLoader.findClass(URLClassLoader.java:190) [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:306) [java] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:247) [java] ... 3 more [java] Java Result: 1
cache not work as expected for iteration?
I'm using spark for LDA impementation. I need cache RDD for next step of Gibbs Sampling, and cached the result and the cache previous could be uncache. Something like LRU cache should delete the previous cache because it is never used then, but the cache runs into confusion: Here is the code:) https://github.com/Earthson/sparklda/blob/master/src/main/scala/net/earthson/nlp/lda/lda.scala#L99 http://apache-spark-user-list.1001560.n3.nabble.com/file/n5292/sparklda_cache1.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n5292/sparklda_cache2.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cache-not-work-as-expected-for-iteration-tp5292.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: performance improvement on second operation...without caching?
Hey Matei, Not sure i understand that. These are 2 separate jobs. So the second job takes advantage of the fact that there is map output left somewhere on disk from the first job, and re-uses that? On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Hi Diana, Apart from these reasons, in a multi-stage job, Spark saves the map output files from map stages to the filesystem, so it only needs to rerun the last reduce stage. This is why you only saw one stage executing. These files are saved for fault recovery but they speed up subsequent runs. Matei On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote: Ethan, What you said is actually not true, Spark won't cache RDD's unless you ask it to. The observation here - that running the same job can speed up substantially even without caching - is common. This is because other components in the stack are performing caching and optimizations. Two that can make a huge difference are: 1. The OS buffer cache. Which will keep recently read disk blocks in memory. 2. The Java just-in-time compiler (JIT) which will use runtime profiling to significantly speed up execution speed. These can make a huge difference if you are running the same job over-and-over. And there are other things like the OS network stack increasing TCP windows and so fourth. These will all improve response time as a spark program executes. On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote: I believe Spark caches RDDs it has memory for regardless of whether you actually call the 'cache' method on the RDD. The 'cache' method just tips off Spark that the RDD should have higher priority. At least, that is my experience and it seems to correspond with your experience and with my recollection of other discussions on this topic on the list. However, going back and looking at the programming guide, this is not the way the cache/persist behavior is described. Does the guide need to be updated? On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.comwrote: I'm just Posty McPostalot this week, sorry folks! :-) Anyway, another question today: I have a bit of code that is pretty time consuming (pasted at the end of the message): It reads in a bunch of XML files, parses them, extracts some data in a map, counts (using reduce), and then sorts. All stages are executed when I do a final operation (take). The first stage is the most expensive: on first run it takes 30s to a minute. I'm not caching anything. When I re-execute that take at the end, I expected it to re-execute all the same stages, and take approximately the same amount of time, but it didn't. The second take executes only a single stage which collectively run very fast: the whole operation takes less than 1 second (down from 5 minutes!) While this is awesome (!) I don't understand it. If I'm not caching data, why would I see such a marked performance improvement on subsequent execution? (or is this related to the known .9.1 bug about sortByKey executing an action when it shouldn't?) Thanks, Diana sparkdev_04-23_KEEP_FOR_BUILDS.png # load XML files containing device activation records. # Find the most common device models activated import xml.etree.ElementTree as ElementTree # Given a partition containing multi-line XML, parse the contents. # Return an iterator of activation Elements contained in the partition def getactivations(fileiterator): s = '' for i in fileiterator: s = s + str(i) filetree = ElementTree.fromstring(s) return filetree.getiterator('activation') # Get the model name from a device activation record def getmodel(activation): return activation.find('model').text filename=hdfs://localhost/user/training/activations/*.xml # parse each partition as a file into an activation XML record activations = sc.textFile(filename) activationTrees = activations.mapPartitions(lambda xml: getactivations(xml)) models = activationTrees.map(lambda activation: getmodel(activation)) # count and sort activations by model topmodels = models.map(lambda model: (model,1))\ .reduceByKey(lambda v1,v2: v1+v2)\ .map(lambda (model,count): (count,model))\ .sortByKey(ascending=False) # display the top 10 models for (count,model) in topmodels.take(10): print Model %s (%s) % (model,count) # repeat! for (count,model) in topmodels.take(10): print Model %s (%s) % (model,count)
Re: sbt/sbt run command returns a JVM problem
Hi Michael, Thank you very much for your reply. Sorry I am not very familiar with sbt. Could you tell me where to set the Java option for the sbt fork for my program? I brought up the sbt console, and run set javaOptions += -Xmx1G in it, but it returned an error: [error] scala.tools.nsc.MissingRequirementError: object scala not found. [error] Use 'last' for the full log. Is this the right way to set the java option? Thank you very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5294.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: performance improvement on second operation...without caching?
Yes, this happens as long as you use the same RDD. For example say you do the following: data1 = sc.textFile(…).map(…).reduceByKey(…) data1.count() data1.filter(…).count() The first count() causes outputs of the map/reduce pair in there to be written out to shuffle files. Next time you do a count, on either this RDD or a child (e.g. after the filter), we notice that output files were already generated for this shuffle so we don’t rerun the map stage. Note that the output does get read again over the network, which is kind of wasteful (if you really wanted to reuse this as quickly as possible you’d use cache()). Matei On May 3, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote: Hey Matei, Not sure i understand that. These are 2 separate jobs. So the second job takes advantage of the fact that there is map output left somewhere on disk from the first job, and re-uses that? On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Diana, Apart from these reasons, in a multi-stage job, Spark saves the map output files from map stages to the filesystem, so it only needs to rerun the last reduce stage. This is why you only saw one stage executing. These files are saved for fault recovery but they speed up subsequent runs. Matei On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote: Ethan, What you said is actually not true, Spark won't cache RDD's unless you ask it to. The observation here - that running the same job can speed up substantially even without caching - is common. This is because other components in the stack are performing caching and optimizations. Two that can make a huge difference are: 1. The OS buffer cache. Which will keep recently read disk blocks in memory. 2. The Java just-in-time compiler (JIT) which will use runtime profiling to significantly speed up execution speed. These can make a huge difference if you are running the same job over-and-over. And there are other things like the OS network stack increasing TCP windows and so fourth. These will all improve response time as a spark program executes. On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote: I believe Spark caches RDDs it has memory for regardless of whether you actually call the 'cache' method on the RDD. The 'cache' method just tips off Spark that the RDD should have higher priority. At least, that is my experience and it seems to correspond with your experience and with my recollection of other discussions on this topic on the list. However, going back and looking at the programming guide, this is not the way the cache/persist behavior is described. Does the guide need to be updated? On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm just Posty McPostalot this week, sorry folks! :-) Anyway, another question today: I have a bit of code that is pretty time consuming (pasted at the end of the message): It reads in a bunch of XML files, parses them, extracts some data in a map, counts (using reduce), and then sorts. All stages are executed when I do a final operation (take). The first stage is the most expensive: on first run it takes 30s to a minute. I'm not caching anything. When I re-execute that take at the end, I expected it to re-execute all the same stages, and take approximately the same amount of time, but it didn't. The second take executes only a single stage which collectively run very fast: the whole operation takes less than 1 second (down from 5 minutes!) While this is awesome (!) I don't understand it. If I'm not caching data, why would I see such a marked performance improvement on subsequent execution? (or is this related to the known .9.1 bug about sortByKey executing an action when it shouldn't?) Thanks, Diana sparkdev_04-23_KEEP_FOR_BUILDS.png # load XML files containing device activation records. # Find the most common device models activated import xml.etree.ElementTree as ElementTree # Given a partition containing multi-line XML, parse the contents. # Return an iterator of activation Elements contained in the partition def getactivations(fileiterator): s = '' for i in fileiterator: s = s + str(i) filetree = ElementTree.fromstring(s) return filetree.getiterator('activation') # Get the model name from a device activation record def getmodel(activation): return activation.find('model').text filename=hdfs://localhost/user/training/activations/*.xml # parse each partition as a file into an activation XML record activations = sc.textFile(filename) activationTrees = activations.mapPartitions(lambda xml: getactivations(xml)) models = activationTrees.map(lambda activation: getmodel(activation)) # count and sort activations by model topmodels = models.map(lambda model: (model,1))\ .reduceByKey(lambda v1,v2: v1+v2)\