Re: sparkR - is it possible to run sparkR on yarn?
We don't have any documentation on running SparkR on YARN and I think there might be some issues that need to be fixed (The recent PySpark on YARN PRs are an example). SparkR has only been tested to work with Spark standalone mode so far. Thanks Shivaram On Tue, Apr 29, 2014 at 7:56 PM, phoenix bai mingzhi...@gmail.com wrote: Hi all, I searched around, but fail to find anything that says about running sparkR on YARN. so, is it possible to run sparkR with yarn ? either with yarn-standalone or yarn-client mode. if so, is there any document that could guide me through the build setup processes? I am desparate for some answers, so please help!
Setting spark.locality.wait.node parameter in interactive shell
Hi, Any suggestion to the following issue ?? I have replication factor 3 in my HDFS. With 3 datanodes, i ran my experiments. Now i just added another node to it with no data in it. When i ran, SPARK launches non-local tasks in it and the time taken is more than what it took for 3 node cluster. Here delayed scheduling fails i think because of the parameter spark.locality.wait.node which is by default 3 sec. It launches ANY level tasks in the added data node. *How to set the spark.locality.wait.node parameter in the env for interactive shell sc.* Thanks !
RE: How fast would you expect shuffle serialize to be?
I just tried to use serializer to write object directly in local mode with code: val datasize = args(1).toInt val dataset = (0 until datasize).map( i = (asmallstring, i)) val out: OutputStream = { new BufferedOutputStream(new FileOutputStream(args(2)), 1024 * 100) } val ser = SparkEnv.get.serializer.newInstance() val serOut = ser.serializeStream(out) dataset.foreach( value = serOut.writeObject(value) ) serOut.flush() serOut.close() Thus one core on one disk. When using javaserializer, throughput is 10~12MB/s, and kryo doubles. So it seems to me that when running the full path code in my previous case, 32 core with 50MB/s total throughput are reasonable? Best Regards, Raymond Liu -Original Message- From: Liu, Raymond [mailto:raymond@intel.com] Later case, total throughput aggregated from all cores. Best Regards, Raymond Liu -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Wednesday, April 30, 2014 1:22 PM To: user@spark.apache.org Subject: Re: How fast would you expect shuffle serialize to be? Hm - I'm still not sure if you mean 100MB/s for each task = 3200MB/s across all cores -or- 3.1MB/s for each task = 100MB/s across all cores If it's the second one, that's really slow and something is wrong. If it's the first one this in the range of what I'd expect, but I'm no expert. On Tue, Apr 29, 2014 at 10:14 PM, Liu, Raymond raymond@intel.com wrote: For all the tasks, say 32 task on total Best Regards, Raymond Liu -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Is this the serialization throughput per task or the serialization throughput for all the tasks? On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote: Hi I am running a WordCount program which count words from HDFS, and I noticed that the serializer part of code takes a lot of CPU time. On a 16core/32thread node, the total throughput is around 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it doubles to around 100-150MB/s. ( I have 12 disks per node and files scatter across disks, so HDFS BW is not a problem) And I also notice that, in this case, the object to write is (String, Int), if I try some case with (int, int), the throughput will be 2-3x faster further. So, in my Wordcount case, the bottleneck is CPU ( cause if with shuffle compress on, the 150MB/s data bandwidth in input side, will usually lead to around 50MB/s shuffle data) This serialize BW looks somehow too low , so I am wondering, what's BW you observe in your case? Does this throughput sounds reasonable to you? If not, anything might possible need to be examined in my case? Best Regards, Raymond Liu
Re: Union of 2 RDD's only returns the first one
Yes, that’s what I meant. Sure, the numbers might not be actually sorted, but the order of rows semantically are kept throughout non-shuffling transforms. I’m on board with you on union as well. Back to the original question, then, why is it important to coalesce to a single partition? When you union two RDDs, for example, rdd1 = [“a, b, c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the two reds are concatenated. Mingyu On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote: If you call map() on an RDD it will retain the ordering it had before, but that is not necessarily a correct sort order for the new RDD. var rdd = sc.parallelize([2, 1, 3]); var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3] var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0] Note that mapped is no longer sorted. When you union two RDD's together it will effectively concatenate the two orderings, which is also not a valid sorted order on the new RDD: rdd1 = [1,2,3] rdd2 = [1,4,5] rdd1.union(rdd2) = [1,2,3,1,4,5] On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote: Thanks for the quick response! To better understand it, the reason sorted RDD has a well-defined ordering is because sortedRDD.getPartitions() returns the partitions in the right order and each partition internally is properly sorted. So, if you have var rdd = sc.parallelize([2, 1, 3]); var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3] var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4] Since mapValues doesn’t change the order of partitions not change the order of rows within the partitions, I think “mapped” should have the exact same order as “sorted”. Sure, if a transform involves shuffling, the order will change. Am I mistaken? Is there an extra detail in sortedRDD that guarantees a well-defined ordering? If it’s true that the order of partitions returned by RDD.getPartitions() and the row orders within the partitions determine the row order, I’m not sure why union doesn’t respect the order because union operation simply concatenates the two lists of partitions from the two RDDs. Mingyu On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote: You are right, once you sort() the RDD, then yes it has a well defined ordering. But that ordering is lost as soon as you transform the RDD, including if you union it with another RDD. On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com wrote: Hi Patrick, I¹m a little confused about your comment that RDDs are not ordered. As far as I know, RDDs keep list of partitions that are ordered and this is why I can call RDD.take() and get the same first k rows every time I call it and RDD.take() returns the same entries as RDD.map(Š).take() because map preserves the partition order. RDD order is also what allows me to get the top k out of RDD by doing RDD.sort().take(). Am I misunderstanding it? Or, is it just when RDD is written to disk that the order is not well preserved? Thanks in advance! Mingyu On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote: Ah somehow after all this time I've never seen that! On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia buendia...@gmail.com wrote: On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell pwend...@gmail.com wrote: What is the ++ operator here? Is this something you defined? No, it's an alias for union defined in RDD.scala: def ++(other: RDD[T]): RDD[T] = this.union(other) Another issue is that RDD's are not ordered, so when you union two together it doesn't have a well defined ordering. If you do want to do this you could coalesce into one partition, then call MapPartitions and return an iterator that first adds your header and then the rest of the file, then call saveAsTextFile. Keep in mind this will only work if you coalesce into a single partition. Thanks! I'll give this a try. myRdd.coalesce(1) .map(_.mkString(,))) .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator) .saveAsTextFile(out.csv) - Patrick On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia buendia...@gmail.com wrote: Hi, I'm trying to find a way to create a csv header when using saveAsTextFile, and I came up with this: (sc.makeRDD(Array(col1,col2,col3), 1) ++ myRdd.coalesce(1).map(_.mkString(,))) .saveAsTextFile(out.csv) But it only saves the header part. Why is that the union method does not return both RDD's? smime.p7s Description: S/MIME cryptographic signature
Re: NoSuchMethodError from Spark Java
i fixed it. i make my sbt project depend on spark/trunk/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar and it works -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5096.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Shuffle Spill Issue
Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey just calls combineByKey: def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = { combineByKey[V]((v: V) = v, func, func, partitioner) } (I think I confused reduceByKey with groupByKey.) On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond raymond@intel.com wrote: Hi Daniel Thanks for your reply, While I think for reduceByKey, it will also do map side combine, thus extra the result is the same, say, for each partition, one entry per distinct word. In my case with javaserializer, 240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill ( memory ) is abnormal, and sounds to me should not trigger at all. And, by the way, this behavior only occurs in map out side, on reduce / shuffle fetch side, this strange behavior won't happen. Best Regards, Raymond Liu From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] I have no idea why shuffle spill is so large. But this might make it smaller: val addition = (a: Int, b: Int) = a + b val wordsCount = wordsPair.combineByKey(identity, addition, addition) This way only one entry per distinct word will end up in the shuffle for each partition, instead of one entry per word occurrence. On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond raymond@intel.com wrote: Hi Patrick I am just doing simple word count , the data is generated by hadoop random text writer. This seems to me not quite related to compress , If I turn off compress on shuffle, the metrics is something like below for the smaller 240MB Dataset. Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 10 sr437:48527 35 s8 0 8 0.0 B 2.5 MB 2.2 GB 1291.2 KB 12 sr437:46077 34 s8 0 8 0.0 B 2.5 MB 1822.6 MB 1073.3 KB 13 sr434:37896 31 s8 0 8 0.0 B 2.4 MB 1099.2 MB 621.2 KB 15 sr438:52819 31 s8 0 8 0.0 B 2.5 MB 1898.8 MB 1072.6 KB 16 sr434:37103 32 s8 0 8 0.0 B 2.4 MB 1638.0 MB 1044.6 KB And the program pretty simple: val files = sc.textFile(args(1)) val words = files.flatMap(_.split( )) val wordsPair = words.map(x = (x, 1)) val wordsCount = wordsPair.reduceByKey(_ + _) val count = wordsCount.count() println(Number of words = + count) Best Regards, Raymond Liu From: Patrick Wendell [mailto:pwend...@gmail.com] Could you explain more what your job is doing and what data types you are using? These numbers alone don't necessarily indicate something is wrong. The relationship between the in-memory and on-disk shuffle amount is definitely a bit strange, the data gets compressed when written to disk, but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much. On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond raymond@intel.com wrote: Hi I am running a simple word count program on spark standalone cluster. The cluster is made up of 6 node, each run 4 worker and each worker own 10G memory and 16 core thus total 96 core and 240G memory. ( well, also used to configed as 1 worker with 40G memory on each node ) I run a very small data set (2.4GB on HDFS on total) to confirm the problem here as below: As you can read from part of the task metrics as below, I noticed that the shuffle spill part of metrics indicate that there are something wrong. Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 sr437:42139 29 s4 0 4 0.0 B 4.3 MB 23.6 GB 4.3 MB 1 sr433:46935 1.1 min 4 0 4 0.0 B 4.2 MB 19.0 GB 3.4 MB 10 sr436:53277 26 s4 0 4 0.0 B 4.3 MB 25.6 GB 4.6 MB 11 sr437:58872 32 s4 0 4 0.0 B 4.3 MB 25.0 GB 4.4 MB 12 sr435:48358 27 s4 0 4 0.0 B 4.3 MB 25.1 GB 4.4 MB You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by no means that the spill should trigger, since the memory is not used up at all. To verify that I further reduce the data size to 240MB on total And here is the result: Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 sr437:50895 15 s4 0 4 0.0 B 703.0 KB 80.0 MB 43.2 KB 1 sr433:50207 17 s4 0 4 0.0 B 704.7 KB 389.5 MB90.2 KB
the spark configuage
Hi, when I configue spark, run the shell instruction: ./spark-shellit told me like this: WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your builtin-java classes where applicable,when it connect to ResourceManager,it stopped. What should I DO? Wish your reply -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Joining not-pair RDDs in Spark
That's the approach I finally used. Thanks for your help :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034p5099.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: the spark configuage
Hi The reason you saw that warning is the native Hadoop library $HADOOP_HOME/lib/native/libhadoop.so.1.0.0 was actually compiled on 32 bit. Anyway, it's just a warning, and won't impact Hadoop's functionalities. Here is the way if you do want to eliminate this warning, download the source code of Hadoop and recompile libhadoop.so.1.0.0 on 64bit system, then replace the 32bit one. Steps on how to recompile source code are included here for Ubuntu: http://www.ercoppa.org/Linux-Compile-Hadoop-220-fix-Unable-to-load-native-hadoop-library.htm http://www.csrdu.org/nauman/2014/01/23/geting-started-with-hadoop-2-2-0-building/ Good luck. On Wed, Apr 30, 2014 at 1:28 PM, Sophia sln-1...@163.com wrote: Hi, when I configue spark, run the shell instruction: ./spark-shellit told me like this: WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your builtin-java classes where applicable,when it connect to ResourceManager,it stopped. What should I DO? Wish your reply -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: the spark configuage
On 30 Apr 2014 10:35, Akhil Das ak...@sigmoidanalytics.com wrote: Hi The reason you saw that warning is the native Hadoop library $HADOOP_HOME/lib/native/libhadoop.so.1.0.0 was actually compiled on 32 bit. Anyway, it's just a warning, and won't impact Hadoop's functionalities. Here is the way if you do want to eliminate this warning, download the source code of Hadoop and recompile libhadoop.so.1.0.0 on 64bit system, then replace the 32bit one. Steps on how to recompile source code are included here for Ubuntu: http://www.ercoppa.org/Linux-Compile-Hadoop-220-fix-Unable-to-load-native-hadoop-library.htm http://www.csrdu.org/nauman/2014/01/23/geting-started-with-hadoop-2-2-0-building/ Good luck. On Wed, Apr 30, 2014 at 1:28 PM, Sophia sln-1...@163.com wrote: Hi, when I configue spark, run the shell instruction: ./spark-shellit told me like this: WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your builtin-java classes where applicable,when it connect to ResourceManager,it stopped. What should I DO? Wish your reply -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
new Washington DC Area Spark Meetup
Hi, all! For those in the Washington DC area (DC/MD/VA), we just started a new Spark Meetup. We'd love for you to join! -d Here's the link: http://www.meetup.com/Washington-DC-Area-Spark-Interactive/ Description: This is an interactive meetup for Washington DC, Virginia and Maryland users, enthusiasts, and explorers of Apache Spark (www.spark-project.org). Spark is the powerful open source Data processing framework that extends and accelerates Hadoop; built around speed, ease of use, and sophisticated analytics. This is a very interactive meetup where we will exchange ideas, inspire and learn from each other, and bring the top minds and innovators around big data and real-time solutions to the table. This meetup is meant for both the business and technical community (both the curious and the experts) interested in the how and the why of Sparks' capabilities. The meetup will include introductions to the various Spark features and functions, case studies from current users, best practices for deployment, and speakers from the top technology leaders and vendors who are helping craft Spark's roadmap for the future. Let's build this exciting Spark community together! Thanks, Donna-M. Fernandez | VP of Operations Services | MetiStream, Inc. | do...@metistream.com | 202.642.3220 attachment: winmail.dat
Re: the spark configuage
I'm guessing your shell stopping when it attempts to connect to the RM is not related to that warning. You'll get that message out of the box from Spark if you don't have HADOOP_HOME set correctly. I'm using CDH 5.0 installed in default locations, and got rid of the warning by setting HADOOP_HOME to /usr/lib/hadoop. The stopping issue might be something unrelated. Diana On Wed, Apr 30, 2014 at 3:58 AM, Sophia sln-1...@163.com wrote: Hi, when I configue spark, run the shell instruction: ./spark-shellit told me like this: WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your builtin-java classes where applicable,when it connect to ResourceManager,it stopped. What should I DO? Wish your reply -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Union of 2 RDD's only returns the first one
Okay, that makes sense. It’d be great if this can be better documented at some point, because the only way to find out about the resulting RDD row order is by looking at the code. Thanks for the discussion! Mingyu On 4/29/14, 11:59 PM, Patrick Wendell pwend...@gmail.com wrote: I don't think we guarantee anywhere that union(A, B) will behave by concatenating the partitions, it just happens to be an artifact of the current implementation. rdd1 = [1,2,3] rdd2 = [1,4,5] rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it wouldn't violate the contract of union AFIAK the only guarentee is the resulting RDD will contain all elements. - Patrick On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim m...@palantir.com wrote: Yes, that’s what I meant. Sure, the numbers might not be actually sorted, but the order of rows semantically are kept throughout non-shuffling transforms. I’m on board with you on union as well. Back to the original question, then, why is it important to coalesce to a single partition? When you union two RDDs, for example, rdd1 = [“a, b, c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the two reds are concatenated. Mingyu On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote: If you call map() on an RDD it will retain the ordering it had before, but that is not necessarily a correct sort order for the new RDD. var rdd = sc.parallelize([2, 1, 3]); var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3] var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0] Note that mapped is no longer sorted. When you union two RDD's together it will effectively concatenate the two orderings, which is also not a valid sorted order on the new RDD: rdd1 = [1,2,3] rdd2 = [1,4,5] rdd1.union(rdd2) = [1,2,3,1,4,5] On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote: Thanks for the quick response! To better understand it, the reason sorted RDD has a well-defined ordering is because sortedRDD.getPartitions() returns the partitions in the right order and each partition internally is properly sorted. So, if you have var rdd = sc.parallelize([2, 1, 3]); var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3] var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4] Since mapValues doesn’t change the order of partitions not change the order of rows within the partitions, I think “mapped” should have the exact same order as “sorted”. Sure, if a transform involves shuffling, the order will change. Am I mistaken? Is there an extra detail in sortedRDD that guarantees a well-defined ordering? If it’s true that the order of partitions returned by RDD.getPartitions() and the row orders within the partitions determine the row order, I’m not sure why union doesn’t respect the order because union operation simply concatenates the two lists of partitions from the two RDDs. Mingyu On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote: You are right, once you sort() the RDD, then yes it has a well defined ordering. But that ordering is lost as soon as you transform the RDD, including if you union it with another RDD. On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com wrote: Hi Patrick, I¹m a little confused about your comment that RDDs are not ordered. As far as I know, RDDs keep list of partitions that are ordered and this is why I can call RDD.take() and get the same first k rows every time I call it and RDD.take() returns the same entries as RDD.map(Š).take() because map preserves the partition order. RDD order is also what allows me to get the top k out of RDD by doing RDD.sort().take(). Am I misunderstanding it? Or, is it just when RDD is written to disk that the order is not well preserved? Thanks in advance! Mingyu On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote: Ah somehow after all this time I've never seen that! On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia buendia...@gmail.com wrote: On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell pwend...@gmail.com wrote: What is the ++ operator here? Is this something you defined? No, it's an alias for union defined in RDD.scala: def ++(other: RDD[T]): RDD[T] = this.union(other) Another issue is that RDD's are not ordered, so when you union two together it doesn't have a well defined ordering. If you do want to do this you could coalesce into one partition, then call MapPartitions and return an iterator that first adds your header and then the rest of the file, then call saveAsTextFile. Keep in mind this will only work if you coalesce into a single partition. Thanks! I'll give this a try. myRdd.coalesce(1) .map(_.mkString(,))) .mapPartitions(it = (Seq(col1,col2,col3) ++
Reading multiple S3 objects, transforming, writing back one
Hi Playing around with Spark S3, I'm opening multiple objects (CSV files) with: val hfile = sc.textFile(s3n://bucket/2014-04-28/) so hfile is a RDD representing 10 objects that were underneath 2014-04-28. After I've sorted and otherwise transformed the content, I'm trying to write it back to a single object: sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv) unfortunately this results in a folder named concatted.csv with 10 objects underneath, part-0 .. part-00010, corresponding to the 10 original objects loaded. How can I achieve the desired behaviour of putting a single object named concatted.csv ? I've tried 0.9.1 and 1.0.0-RC3. Thanks! Peter
Re: Reading multiple S3 objects, transforming, writing back one
Ah, looks like RDD.coalesce(1) solves one part of the problem. On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com wrote: Hi Playing around with Spark S3, I'm opening multiple objects (CSV files) with: val hfile = sc.textFile(s3n://bucket/2014-04-28/) so hfile is a RDD representing 10 objects that were underneath 2014-04-28. After I've sorted and otherwise transformed the content, I'm trying to write it back to a single object: sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv) unfortunately this results in a folder named concatted.csv with 10 objects underneath, part-0 .. part-00010, corresponding to the 10 original objects loaded. How can I achieve the desired behaviour of putting a single object named concatted.csv ? I've tried 0.9.1 and 1.0.0-RC3. Thanks! Peter
Re: Union of 2 RDD's only returns the first one
I agree with you in general that as an API user, I shouldn’t be relying on code. However, without looking at the code, there is no way for me to find out even whether map() keeps the row order. Without the knowledge at all, I’d need to do “sort” every time I need certain things in a certain order. (and, sort is really expensive.) On the other hand, if I can assume, say, “filter” or “map” doesn’t shuffle the rows around, I can do the sort once and assume that the order is retained throughout such operations saving a lot of time from doing unnecessary sorts. Mingyu From: Mark Hamstra m...@clearstorydata.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Wednesday, April 30, 2014 at 11:36 AM To: user@spark.apache.org user@spark.apache.org Subject: Re: Union of 2 RDD's only returns the first one Which is what you shouldn't be doing as an API user, since that implementation code might change. The documentation doesn't mention a row ordering guarantee, so none should be assumed. It is hard enough for us to correctly document all of the things that the API does do. We really shouldn't be forced into the expectation that we will also fully document everything that the API doesn't do. On Wed, Apr 30, 2014 at 11:13 AM, Mingyu Kim m...@palantir.com wrote: Okay, that makes sense. It’d be great if this can be better documented at some point, because the only way to find out about the resulting RDD row order is by looking at the code. Thanks for the discussion! Mingyu On 4/29/14, 11:59 PM, Patrick Wendell pwend...@gmail.com wrote: I don't think we guarantee anywhere that union(A, B) will behave by concatenating the partitions, it just happens to be an artifact of the current implementation. rdd1 = [1,2,3] rdd2 = [1,4,5] rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it wouldn't violate the contract of union AFIAK the only guarentee is the resulting RDD will contain all elements. - Patrick On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim m...@palantir.com wrote: Yes, that’s what I meant. Sure, the numbers might not be actually sorted, but the order of rows semantically are kept throughout non-shuffling transforms. I’m on board with you on union as well. Back to the original question, then, why is it important to coalesce to a single partition? When you union two RDDs, for example, rdd1 = [“a, b, c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the two reds are concatenated. Mingyu On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote: If you call map() on an RDD it will retain the ordering it had before, but that is not necessarily a correct sort order for the new RDD. var rdd = sc.parallelize([2, 1, 3]); var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3] var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0] Note that mapped is no longer sorted. When you union two RDD's together it will effectively concatenate the two orderings, which is also not a valid sorted order on the new RDD: rdd1 = [1,2,3] rdd2 = [1,4,5] rdd1.union(rdd2) = [1,2,3,1,4,5] On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote: Thanks for the quick response! To better understand it, the reason sorted RDD has a well-defined ordering is because sortedRDD.getPartitions() returns the partitions in the right order and each partition internally is properly sorted. So, if you have var rdd = sc.parallelize([2, 1, 3]); var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3] var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4] Since mapValues doesn’t change the order of partitions not change the order of rows within the partitions, I think “mapped” should have the exact same order as “sorted”. Sure, if a transform involves shuffling, the order will change. Am I mistaken? Is there an extra detail in sortedRDD that guarantees a well-defined ordering? If it’s true that the order of partitions returned by RDD.getPartitions() and the row orders within the partitions determine the row order, I’m not sure why union doesn’t respect the order because union operation simply concatenates the two lists of partitions from the two RDDs. Mingyu On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote: You are right, once you sort() the RDD, then yes it has a well defined ordering. But that ordering is lost as soon as you transform the RDD, including if you union it with another RDD. On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com wrote: Hi Patrick, I¹m a little confused about your comment that RDDs are not ordered. As far as I know, RDDs keep list of partitions that are
Re: Reading multiple S3 objects, transforming, writing back one
Yes, saveAsTextFile() will give you 1 part per RDD partition. When you coalesce(1), you move everything in the RDD to a single partition, which then gives you 1 output file. It will still be called part-0 or something like that because that’s defined by the Hadoop API that Spark uses for reading to/writing from S3. I don’t know of a way to change that. On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote: Ah, looks like RDD.coalesce(1) solves one part of the problem. On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com wrote: Hi Playing around with Spark S3, I'm opening multiple objects (CSV files) with: val hfile = sc.textFile(s3n://bucket/2014-04-28/) so hfile is a RDD representing 10 objects that were underneath 2014-04-28. After I've sorted and otherwise transformed the content, I'm trying to write it back to a single object: sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv) unfortunately this results in a folder named concatted.csv with 10 objects underneath, part-0 .. part-00010, corresponding to the 10 original objects loaded. How can I achieve the desired behaviour of putting a single object named concatted.csv ? I've tried 0.9.1 and 1.0.0-RC3. Thanks! Peter
Re: What is Seq[V] in updateStateByKey?
S is the previous count, if any. Seq[V] are potentially many new counts. All of them have to be added together to keep an accurate total. It's as if the count were 3, and I tell you I've just observed 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not 1 + 1. I butted in since I'd like to ask a different question about the same line of code. Why: val currentCount = values.foldLeft(0)(_ + _) instead of val currentCount = values.sum This happens a few places in the code. sum seems equivalent and likely quicker. Same with things like filter(_ == 200).size instead of count(_ == 200)... pretty trivial but hey. On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi TD, Why does the example keep recalculating the count via fold? Wouldn’t it make more sense to get the last count in values Seq and add 1 to it and save that as current count? From what Sean explained I understand that all values in Seq have the same key. Then when a new value for that key is found it is added to this Seq collection and the update function is called. Is my understanding correct?
Re: Reading multiple S3 objects, transforming, writing back one
Thanks Nicholas, this is a bit of a shame, not very practical for log roll up for example when every output needs to be in it's own directory. On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yes, saveAsTextFile() will give you 1 part per RDD partition. When you coalesce(1), you move everything in the RDD to a single partition, which then gives you 1 output file. It will still be called part-0 or something like that because that’s defined by the Hadoop API that Spark uses for reading to/writing from S3. I don’t know of a way to change that. On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote: Ah, looks like RDD.coalesce(1) solves one part of the problem. On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com wrote: Hi Playing around with Spark S3, I'm opening multiple objects (CSV files) with: val hfile = sc.textFile(s3n://bucket/2014-04-28/) so hfile is a RDD representing 10 objects that were underneath 2014-04-28. After I've sorted and otherwise transformed the content, I'm trying to write it back to a single object: sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv) unfortunately this results in a folder named concatted.csv with 10 objects underneath, part-0 .. part-00010, corresponding to the 10 original objects loaded. How can I achieve the desired behaviour of putting a single object named concatted.csv ? I've tried 0.9.1 and 1.0.0-RC3. Thanks! Peter
Re: What is Seq[V] in updateStateByKey?
Yeah, I remember changing fold to sum in a few places, probably in testsuites, but missed this example I guess. On Wed, Apr 30, 2014 at 1:29 PM, Sean Owen so...@cloudera.com wrote: S is the previous count, if any. Seq[V] are potentially many new counts. All of them have to be added together to keep an accurate total. It's as if the count were 3, and I tell you I've just observed 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not 1 + 1. I butted in since I'd like to ask a different question about the same line of code. Why: val currentCount = values.foldLeft(0)(_ + _) instead of val currentCount = values.sum This happens a few places in the code. sum seems equivalent and likely quicker. Same with things like filter(_ == 200).size instead of count(_ == 200)... pretty trivial but hey. On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi TD, Why does the example keep recalculating the count via fold? Wouldn’t it make more sense to get the last count in values Seq and add 1 to it and save that as current count? From what Sean explained I understand that all values in Seq have the same key. Then when a new value for that key is found it is added to this Seq collection and the update function is called. Is my understanding correct?
Re: NoSuchMethodError from Spark Java
Hi, One thing you can do is set the spark version your project depends on to 1.0.0-SNAPSHOT (make sure it matches the version of Spark you're building); then before building your project, run sbt publishLocal on the Spark tree. On Wed, Apr 30, 2014 at 12:11 AM, wxhsdp wxh...@gmail.com wrote: i fixed it. i make my sbt project depend on spark/trunk/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar and it works -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5096.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Marcelo
[ANN]: Scala By The Bay Conference ( aka Silicon Valley Scala Symposium)
Hi, This is not related to Spark. But I thought you might be interested in the second SF Scala conference is coming this August. The SF Scala conference was called Sillicon Valley Scala Symposium last year. From now on, it will be known as Scala By The Bay. http://www.scalabythebay.org -- watch that space for announcements and the CFP! Chester
My talk on Spark: The Next Top (Compute) Model
I meant to post this last week, but this is a talk I gave at the Philly ETE conf. last week: http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model Also here: http://polyglotprogramming.com/papers/Spark-TheNextTopComputeModel.pdf dean -- Dean Wampler, Ph.D. Typesafe @deanwampler http://typesafe.com http://polyglotprogramming.com
Re: Any advice for using big spark.cleaner.delay value in Spark Streaming?
Thanks for your reply. Sorry for the late response, I wanted to do some tests before writing back. The counting part works similar to your advice. I specify a minimum interval like 1 minute, in each hour, day etc. it sums all counters of the current children intervals. However when I want to count unique visitors of the month things get much more complex. I need to merge 30 sets which contains visitor id's and each of them has more than a few hundred thousands of elements. Merging sets may be still the best option rather than keeping another Set for last month though, however I'm not sure because when there are many intersections it may be inefficient. BTW, I have one more question. The HLL example in repository seems confusing to me. How Spark handles global variable usages in mapPartitions method? (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala#L68) I'm also a newbie but I thought the map and mapPartitions methods are similar to Hadoop's map methods so when we run the example on a cluster how an external node reaches a global variable in a single node? Does Spark replicate HyperLogLogMonoid instances across the cluster? Thanks, Burak Emre -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895p5131.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
update of RDDs
In our application, we need distributed RDDs containing key-value maps. We have operations that update RDDs by way of adding entries to the map, delete entries from the map as well as update value part of maps. We also have map reduce functions that operate on the RDDs.The questions are the following. 1. Can RDDs be updated? if Yes, what rae the methods? 2. If we update RDDs, will it happen in place or does it create new RDDs with almost double the original RDD size (original+newly created RDD)? Thank you very much. N.N.Murthy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/update-of-RDDs-tp5132.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Strange lookup behavior. Possible bug?
Dear Sparkers, Has anyone got any insight on this ? I am really stuck. Yadid On 4/28/14, 11:28 AM, Yadid Ayzenberg wrote: Thanks for your answer. I tried running on a single machine - master and worker on one host. I get exactly the same results. Very little CPU activity on the machine in question. The web UI shows a single task and its state is RUNNING. it will remain so indefinitely. I have a single partition, and its size is 1626.2 MB Currently the RDD has 200 elements, but I have tried it with 20 and the behavior is the same. The key is of the form: (0,52fb9aff3004f07d1a87c8ea) Where the first number in the tuple is always 0, and the second one is some string that can appear more than once. The RDD is created by using the newAPIHadoopRDD. Any additional info I can provide? Yadid On 4/28/14 10:46 AM, Daniel Darabos wrote: That is quite mysterious, and I do not think we have enough information to answer. JavaPairRDDString, Tuple2.lookup() works fine on a remote Spark cluster: $ MASTER=spark://localhost:7077 bin/spark-shell scala val rdd = org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 3).map(x = ((x%3).toString, (x, x%3 scala rdd.lookup(1) res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)] You suggest maybe the driver does not receive a message from an executor. I guess it is likely possible, though it has not happened to me. I would recommend running on a single machine in the standalone setup. Start the master and worker on the same machine, run the application there too. This should eliminate network configuration problems. If you still see the issue, I'd check whether the task has really completed. What do you see on the web UI? Is the executor using CPU? Good luck. On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg ya...@media.mit.edu mailto:ya...@media.mit.edu wrote: Can someone please suggest how I can move forward with this? My spark version is 0.9.1. The big challenge is that this issue is not recreated when running in local mode. What could be the difference? I would really appreciate any pointers, as currently the the job just hangs. On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote: Some additional information - maybe this rings a bell with someone: I suspect this happens when the lookup returns more than one value. For 0 and 1 values, the function behaves as you would expect. Anyone ? On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote: Hi All, Im running a lookup on a JavaPairRDDString, Tuple2. When running on local machine - the lookup is successfull. However, when running a standalone cluster with the exact same dataset - one of the tasks never ends (constantly in RUNNING status). When viewing the worker log, it seems that the task has finished successfully: 14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally 14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 10896794 14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver 14/04/25 13:40:38 INFO Executor: Finished task ID 2 But it seems the driver is not aware of this, and hangs indefinitely. If I execute a count priot to the lookup - I get the correct number which suggests that the cluster is operating as expected. The exact same scenario works with a different type of key (Tuple2): JavaPairRDDTuple2, Tuple2. Any ideas on how to debug this problem ? Thanks, Yadid
CDH 5.0 and Spark 0.9.0
Hello, So I was unable to run the following commands from the spark shell with CDH 5.0 and spark 0.9.0, see below. Once I removed the property property nameio.compression.codec.lzo.class/name valuecom.hadoop.compression.lzo.LzoCodec/value finaltrue/final /property from the core-site.xml on the node, the spark commands worked. Is there a specific setup I am missing? scala var log = sc.textFile(hdfs://jobs-ab-hnn1//input/core-site.xml) 14/04/30 22:43:16 INFO MemoryStore: ensureFreeSpace(78800) called with curMem=150115, maxMem=308713881 14/04/30 22:43:16 INFO MemoryStore: Block broadcast_1 stored as values to memory (estimated size 77.0 KB, free 294.2 MB) 14/04/30 22:43:16 WARN Configuration: mapred-default.xml:an attempt to override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring. 14/04/30 22:43:16 WARN Configuration: yarn-site.xml:an attempt to override final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring. 14/04/30 22:43:16 WARN Configuration: hdfs-site.xml:an attempt to override final parameter: mapreduce.map.output.compress.codec; Ignoring. log: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at console:12 scala log.count() 14/04/30 22:43:03 WARN JobConf: The variable mapred.child.ulimit is no longer used. 14/04/30 22:43:04 WARN Configuration: mapred-default.xml:an attempt to override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring. 14/04/30 22:43:04 WARN Configuration: yarn-site.xml:an attempt to override final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring. 14/04/30 22:43:04 WARN Configuration: hdfs-site.xml:an attempt to override final parameter: mapreduce.map.output.compress.codec; Ignoring. java.lang.IllegalArgumentException: java.net.UnknownHostException: jobs-a-hnn1 at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:576) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:521) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.SparkContext.runJob(SparkContext.scala:902) at org.apache.spark.rdd.RDD.count(RDD.scala:720) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:795) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:840) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:752) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:600) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:607) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:610) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:935) at
same partition id means same location?
Hi, i'am just reviewing advanced spark features. it's about the pagerank example. it said any shuffle operation on two RDDs will take on the partitioner of one of them, if one is set. so first we partition the Links by hashPartitioner, then we join the Links and Ranks0. Ranks0 will take the hashPartitioner according to the document. the following reduceByKey operation also respect the hashPartitioner, so when we join Links and Ranks1, there is no shuffle at all. does that mean partitions of different RDDs with the same id will go exactly to the same location even if the different RDDs locates at different nodes originally? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/same-partition-id-means-same-location-tp5136.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reading multiple S3 objects, transforming, writing back one
This is a consequence of the way the Hadoop files API works. However, you can (fairly easily) add code to just rename the file because it will always produce the same filename. (heavy use of pseudo code) dir = /some/dir rdd.coalesce(1).saveAsTextFile(dir) f = new File(dir + part-0) f.moveTo(somewhere else) dir.remove() It might be cool to add a utility called `saveAsSingleFile` or something that does this for you. In fact probably we should have called saveAsTextfile saveAsTextFiles to make it more clear... On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote: Thanks Nicholas, this is a bit of a shame, not very practical for log roll up for example when every output needs to be in it's own directory. On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yes, saveAsTextFile() will give you 1 part per RDD partition. When you coalesce(1), you move everything in the RDD to a single partition, which then gives you 1 output file. It will still be called part-0 or something like that because that's defined by the Hadoop API that Spark uses for reading to/writing from S3. I don't know of a way to change that. On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote: Ah, looks like RDD.coalesce(1) solves one part of the problem. On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com wrote: Hi Playing around with Spark S3, I'm opening multiple objects (CSV files) with: val hfile = sc.textFile(s3n://bucket/2014-04-28/) so hfile is a RDD representing 10 objects that were underneath 2014-04-28. After I've sorted and otherwise transformed the content, I'm trying to write it back to a single object: sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv) unfortunately this results in a folder named concatted.csv with 10 objects underneath, part-0 .. part-00010, corresponding to the 10 original objects loaded. How can I achieve the desired behaviour of putting a single object named concatted.csv ? I've tried 0.9.1 and 1.0.0-RC3. Thanks! Peter
How to handle this situation: Huge File Shared by All maps and Each Computer Has one copy?
Hi there, I was wondering if somebody could give me some suggestions about how to handle this situation: I have a spark program, in which it reads a 6GB file first (Not RDD) locally, and then do the map/reduce tasks. This 6GB file contains information that will be shared by all the map tasks. Previously, I handled it using the broadcast function in Spark, which is like this: global_file = fileRead(filename) global_file.broadcast() rdd.map(ele = MapFunc(ele)) However, when running the spark program with a cluster of multiple computers, I found that the remote nodes waited forever for the broadcasting of the global_file. I think that it may not be a good solution to have each map task to load the global file by themselves, which would incur huge overhead. Actually, we have this global file in each node of our cluster. The ideal behavior I hope is that for each node, they can read this global file only from its local disk (and stay in memory), and then for all the map/reduce tasks scheduled to this node, it can share that data. Hence, the global file is neither like broadcasting variables, which is shared by all map/reduce tasks, nor private variables only seen by one map task. It is shared node-widely, which is read in each node only one time and shared by all the tasks mapped to this node. Could anybody tell me how to program in Spark to handle it? Thanks so much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-handle-this-situation-Huge-File-Shared-by-All-maps-and-Each-Computer-Has-one-copy-tp5139.html Sent from the Apache Spark User List mailing list archive at Nabble.com.