Costs of transformations
Is it possible bound costs of operations such as flatMap(), collect() based on the size of RDDs?
Re: Reading large files
Thanks. In both cases, does the driver need to have enough memory to contain the entire file? How do both these functions work when, for example, the binary file is 4G and available driver memory is lesser? On Wed, May 6, 2015 at 1:54 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: SparkContext has two methods for reading binary files: binaryFiles (reads multiple binary files into RDD) and binaryRecords (reads separate lines of a single binary file into RDD). For example, I have a big binary file split into logical parts, so I can use “binaryFiles”. The possible problem is that the order of records between parts is not preserved, so I have to do sortBy afterwards. Alexander *From:* Vijayasarathy Kannan [mailto:kvi...@vt.edu] *Sent:* Wednesday, May 06, 2015 10:38 AM *To:* user@spark.apache.org *Subject:* Reading large files Hi, Is there a way to read a large file, in parallel/distributed way? I have a single large binary file which I currently read on the driver program and then distribute it to executors (using groupBy(), etc.). I want to know if there's a way to make the executors each read a specific/unique portion of the file or create RDDs of multiple portions of the file and finally union them. Thanks.
Reading large files
Hi, Is there a way to read a large file, in parallel/distributed way? I have a single large binary file which I currently read on the driver program and then distribute it to executors (using groupBy(), etc.). I want to know if there's a way to make the executors each read a specific/unique portion of the file or create RDDs of multiple portions of the file and finally union them. Thanks.
Spark JVM default memory
Starting the master with /sbin/start-master.sh creates a JVM with only 512MB of memory. How to change this default amount of memory? Thanks, Vijay
Re: Spark JVM default memory
I am not able to access the web UI for some reason. But the logs (being written while running my application) show that only 385Mb are being allocated for each executor (or slave nodes) while the executor memory I set is 16Gb. This 385Mb is not the same for each run either. It looks random (sometimes 1G, sometimes 512M, etc.) On Mon, May 4, 2015 at 6:57 PM, Mohammed Guller moham...@glassbeam.com wrote: Did you confirm through the Spark UI how much memory is getting allocated to your application on each worker? Mohammed *From:* Vijayasarathy Kannan [mailto:kvi...@vt.edu] *Sent:* Monday, May 4, 2015 3:36 PM *To:* Andrew Ash *Cc:* user@spark.apache.org *Subject:* Re: Spark JVM default memory I am trying to read in a file (4GB file). I tried setting both spark.driver.memory and spark.executor.memory to large values (say 16GB) but I still get a GC limit exceeded error. Any idea what I am missing? On Mon, May 4, 2015 at 5:30 PM, Andrew Ash and...@andrewash.com wrote: It's unlikely you need to increase the amount of memory on your master node since it does simple bookkeeping. The majority of the memory pressure across a cluster is on executor nodes. See the conf/spark-env.sh file for configuring heap sizes, and this section in the docs for more information on how to make these changes: http://spark.apache.org/docs/latest/configuration.html On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: Starting the master with /sbin/start-master.sh creates a JVM with only 512MB of memory. How to change this default amount of memory? Thanks, Vijay
Re: Spark JVM default memory
I am trying to read in a file (4GB file). I tried setting both spark.driver.memory and spark.executor.memory to large values (say 16GB) but I still get a GC limit exceeded error. Any idea what I am missing? On Mon, May 4, 2015 at 5:30 PM, Andrew Ash and...@andrewash.com wrote: It's unlikely you need to increase the amount of memory on your master node since it does simple bookkeeping. The majority of the memory pressure across a cluster is on executor nodes. See the conf/spark-env.sh file for configuring heap sizes, and this section in the docs for more information on how to make these changes: http://spark.apache.org/docs/latest/configuration.html On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: Starting the master with /sbin/start-master.sh creates a JVM with only 512MB of memory. How to change this default amount of memory? Thanks, Vijay
Complexity of transformations in Spark
What is the complexity of transformations and actions in Spark, such as groupBy(), flatMap(), collect(), etc.? What attributes do we need to factor (such as number of partitions) in while analyzing codes using these operations?
Error running Spark on Cloudera
I am trying to run a Spark application using spark-submit on a cluster using Cloudera manager. I get the error Exception in thread main java.io.IOException: Error in creating log directory: file:/user/spark/applicationHistory//app-20150408094126-0008 Adding the below lines in /etc/spark/conf/spark-defaults.conf wouldn't resolve it. *spark.eventLog.dir=/user/spark/applicationHistory* *spark.eventLog.enabled=true* Any idea on what is missing?
Re: Reading a large file (binary) into RDD
Thanks everyone for the inputs. I guess I will try out a custom implementation of InputFormat. But I have no idea where to start. Are there any code examples of this that might help? On Fri, Apr 3, 2015 at 9:15 AM, Dean Wampler deanwamp...@gmail.com wrote: This might be overkill for your needs, but the scodec parser combinator library might be useful for creating a parser. https://github.com/scodec/scodec Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 6:53 PM, java8964 java8...@hotmail.com wrote: I think implementing your own InputFormat and using SparkContext.hadoopFile() is the best option for your case. Yong -- From: kvi...@vt.edu Date: Thu, 2 Apr 2015 17:31:30 -0400 Subject: Re: Reading a large file (binary) into RDD To: freeman.jer...@gmail.com CC: user@spark.apache.org The file has a specific structure. I outline it below. The input file is basically a representation of a graph. INT INT(A) LONG (B) A INTs(Degrees) A SHORTINTs (Vertex_Attribute) B INTs B INTs B SHORTINTs B SHORTINTs A - number of vertices B - number of edges (note that the INTs/SHORTINTs associated with this are edge attributes) After reading in the file, I need to create two RDDs (one with vertices and the other with edges) On Thu, Apr 2, 2015 at 4:46 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hm, that will indeed be trickier because this method assumes records are the same byte size. Is the file an arbitrary sequence of mixed types, or is there structure, e.g. short, long, short, long, etc.? If you could post a gist with an example of the kind of file and how it should look once read in that would be useful! - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 2:09 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: Thanks for the reply. Unfortunately, in my case, the binary file is a mix of short and long integers. Is there any other way that could of use here? My current method happens to have a large overhead (much more than actual computation time). Also, I am short of memory at the driver when it has to read the entire file. On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: If it’s a flat binary file and each record is the same length (in bytes), you can use Spark’s binaryRecords method (defined on the SparkContext), which loads records from one or more large flat binary files into an RDD. Here’s an example in python to show how it works: # write data from an array from numpy import random dat = random.randn(100,5) f = open('test.bin', 'w') f.write(dat) f.close() # load the data back in from numpy import frombuffer nrecords = 5 bytesize = 8 recordsize = nrecords * bytesize data = sc.binaryRecords('test.bin', recordsize) parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float')) # these should be equal parsed.first() dat[0,:] Does that help? - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: What are some efficient ways to read a large file into RDDs? For example, have several executors read a specific/unique portion of the file and construct RDDs. Is this possible to do in Spark? Currently, I am doing a line-by-line read of the file at the driver and constructing the RDD.
Re: Reading a large file (binary) into RDD
Thanks for the reply. Unfortunately, in my case, the binary file is a mix of short and long integers. Is there any other way that could of use here? My current method happens to have a large overhead (much more than actual computation time). Also, I am short of memory at the driver when it has to read the entire file. On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: If it’s a flat binary file and each record is the same length (in bytes), you can use Spark’s binaryRecords method (defined on the SparkContext), which loads records from one or more large flat binary files into an RDD. Here’s an example in python to show how it works: # write data from an array from numpy import random dat = random.randn(100,5) f = open('test.bin', 'w') f.write(dat) f.close() # load the data back in from numpy import frombuffer nrecords = 5 bytesize = 8 recordsize = nrecords * bytesize data = sc.binaryRecords('test.bin', recordsize) parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float')) # these should be equal parsed.first() dat[0,:] Does that help? - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: What are some efficient ways to read a large file into RDDs? For example, have several executors read a specific/unique portion of the file and construct RDDs. Is this possible to do in Spark? Currently, I am doing a line-by-line read of the file at the driver and constructing the RDD.
Reading a large file (binary) into RDD
What are some efficient ways to read a large file into RDDs? For example, have several executors read a specific/unique portion of the file and construct RDDs. Is this possible to do in Spark? Currently, I am doing a line-by-line read of the file at the driver and constructing the RDD.
Re: Reading a large file (binary) into RDD
The file has a specific structure. I outline it below. The input file is basically a representation of a graph. INT INT(A) LONG (B) A INTs(Degrees) A SHORTINTs (Vertex_Attribute) B INTs B INTs B SHORTINTs B SHORTINTs A - number of vertices B - number of edges (note that the INTs/SHORTINTs associated with this are edge attributes) After reading in the file, I need to create two RDDs (one with vertices and the other with edges) On Thu, Apr 2, 2015 at 4:46 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hm, that will indeed be trickier because this method assumes records are the same byte size. Is the file an arbitrary sequence of mixed types, or is there structure, e.g. short, long, short, long, etc.? If you could post a gist with an example of the kind of file and how it should look once read in that would be useful! - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 2:09 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: Thanks for the reply. Unfortunately, in my case, the binary file is a mix of short and long integers. Is there any other way that could of use here? My current method happens to have a large overhead (much more than actual computation time). Also, I am short of memory at the driver when it has to read the entire file. On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: If it’s a flat binary file and each record is the same length (in bytes), you can use Spark’s binaryRecords method (defined on the SparkContext), which loads records from one or more large flat binary files into an RDD. Here’s an example in python to show how it works: # write data from an array from numpy import random dat = random.randn(100,5) f = open('test.bin', 'w') f.write(dat) f.close() # load the data back in from numpy import frombuffer nrecords = 5 bytesize = 8 recordsize = nrecords * bytesize data = sc.binaryRecords('test.bin', recordsize) parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float')) # these should be equal parsed.first() dat[0,:] Does that help? - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: What are some efficient ways to read a large file into RDDs? For example, have several executors read a specific/unique portion of the file and construct RDDs. Is this possible to do in Spark? Currently, I am doing a line-by-line read of the file at the driver and constructing the RDD.
Re: Unable to run Spark application
That is failing too, with sbt.resolveexception: unresolved dependency:org.apache.spark#spark-network-common_2.10;1.2.1 On Wed, Apr 1, 2015 at 1:24 PM, Marcelo Vanzin van...@cloudera.com wrote: Try sbt assembly instead. On Wed, Apr 1, 2015 at 10:09 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: Why do I get Failed to find Spark assembly JAR. You need to build Spark before running this program. ? I downloaded spark-1.2.1.tgz from the downloads page and extracted it. When I do sbt package inside my application, it worked fine. But when I try to run my application, I get the above mentioned error. -- Marcelo
Unable to run Spark application
Why do I get Failed to find Spark assembly JAR. You need to build Spark before running this program. ? I downloaded spark-1.2.1.tgz from the downloads page and extracted it. When I do sbt package inside my application, it worked fine. But when I try to run my application, I get the above mentioned error.
Re: Unable to run Spark application
Managed to make sbt assembly work. I run into another issue now. When I do ./sbin/start-all.sh, the script fails saying JAVA_HOME is not set although I have explicitly set that variable to point to the correct Java location. Same happens with ./sbin/start-master.sh script. Any idea what I might be missing? On Wed, Apr 1, 2015 at 1:32 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: That is failing too, with sbt.resolveexception: unresolved dependency:org.apache.spark#spark-network-common_2.10;1.2.1 On Wed, Apr 1, 2015 at 1:24 PM, Marcelo Vanzin van...@cloudera.com wrote: Try sbt assembly instead. On Wed, Apr 1, 2015 at 10:09 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: Why do I get Failed to find Spark assembly JAR. You need to build Spark before running this program. ? I downloaded spark-1.2.1.tgz from the downloads page and extracted it. When I do sbt package inside my application, it worked fine. But when I try to run my application, I get the above mentioned error. -- Marcelo
Problems with spark.akka.frameSize
Hi, I am encountering the following error with a Spark application. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 11257268 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values. I am doing the following in my code. var groupedEdges = graph.edges.groupBy[VertexId](groupBySrcId).persist(StorageLevel.MEMORY_AND_DISK) while(condition) { var updates = groupedEdges.flatMap { edgesBySrc = doWork(edgesBySrc, a, b) } updates.collect.foreach(println) } def doWork(edges: (VertexId, Iterable[Edge[(Int, Int, Int)]]), a: Double, b: Double): List[VertexId] = { // do something with edges and return a list of verteices } Note that the attribute of each edge is a tuple with 3 elements. I encountered the above mentioned error only when having the edge attribute as a tuple. The code doesn't run into this when the edge attribute is a single integer. I tried increasing *spark.akka.frameSize* (to say 100) and it worked without running into this issue. Doing a broadcast does not seem appropriate because each task performing doWork() gets a different set of edges. However, the groups of edges remain the same all through. I was wondering if there is an efficient way to what I'm doing, i.e., pass edgesBySrc efficiently to doWork() (or not pass it all or pass it just once for the first time and have the tasks retain the sets of edges across iterations) ? Thanks
Issues with SBT and Spark
My current simple.sbt is name := SparkEpiFast version := 1.0 scalaVersion := 2.11.4 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.2.1 % provided libraryDependencies += org.apache.spark % spark-graphx_2.11 % 1.2.1 % provided While I do sbt package, it compiles successfully. But while running the application, I get Exception in thread main java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; However, changing the scala version to 2.10.4 and updating the dependency lines appropriately resolves the issue (no exception). Could anyone please point out what I am doing wrong?
Question on RDD groupBy and executors
Hi, I am doing a groupBy on an EdgeRDD like this, val groupedEdges = graph.edges.groupBy[VertexId](func0) while(true) { val info = groupedEdges.flatMap(func1).collect.foreach(func2) } The groupBy distributes the data to different executors on different nodes in the cluster. Given a key K (a vertexId identifying a particular group in *groupedEdges*), is there a way to find details such as - which executor is responsible for K? - which node in the cluster the executor containing K resides on? - access that specific executor (and possibly assign a task) from the driver? Thanks.
PRNG in Scala
Hi, What pseudo-random-number generator does scala.util.Random uses?
Re: Iterating on RDDs
As you suggested, I tried to save the grouped RDD and persisted it in memory before the iterations begin. The performance seems to be much better now. My previous comment that the run times doubled was from a wrong observation. Thanks. On Fri, Feb 27, 2015 at 10:27 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: Thanks. I tried persist() on the RDD. The runtimes appear to have doubled now (without persist() it was ~7s per iteration and now its ~15s). I am running standalone Spark on a 8-core machine. Any thoughts on why the increase in runtime? On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid iras...@cloudera.com wrote: val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER) // or whatever persistence makes more sense for you ... while(true) { val res = grouped.flatMap(F) res.collect.foreach(func) if(criteria) break } On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: Hi, I have the following use case. (1) I have an RDD of edges of a graph (say R). (2) do a groupBy on R (by say source vertex) and call a function F on each group. (3) collect the results from Fs and do some computation (4) repeat the above steps until some criteria is met In (2), the groups are always going to be the same (since R is grouped by source vertex). Question: Is R distributed every iteration (when in (2)) or is it distributed only once when it is created? A sample code snippet is below. while(true) { val res = R.groupBy[VertexId](G).flatMap(F) res.collect.foreach(func) if(criteria) break } Since the groups remain the same, what is the best way to go about implementing the above logic?
Re: Iterating on RDDs
Thanks. I tried persist() on the RDD. The runtimes appear to have doubled now (without persist() it was ~7s per iteration and now its ~15s). I am running standalone Spark on a 8-core machine. Any thoughts on why the increase in runtime? On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid iras...@cloudera.com wrote: val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER) // or whatever persistence makes more sense for you ... while(true) { val res = grouped.flatMap(F) res.collect.foreach(func) if(criteria) break } On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: Hi, I have the following use case. (1) I have an RDD of edges of a graph (say R). (2) do a groupBy on R (by say source vertex) and call a function F on each group. (3) collect the results from Fs and do some computation (4) repeat the above steps until some criteria is met In (2), the groups are always going to be the same (since R is grouped by source vertex). Question: Is R distributed every iteration (when in (2)) or is it distributed only once when it is created? A sample code snippet is below. while(true) { val res = R.groupBy[VertexId](G).flatMap(F) res.collect.foreach(func) if(criteria) break } Since the groups remain the same, what is the best way to go about implementing the above logic?
Iterating on RDDs
Hi, I have the following use case. (1) I have an RDD of edges of a graph (say R). (2) do a groupBy on R (by say source vertex) and call a function F on each group. (3) collect the results from Fs and do some computation (4) repeat the above steps until some criteria is met In (2), the groups are always going to be the same (since R is grouped by source vertex). Question: Is R distributed every iteration (when in (2)) or is it distributed only once when it is created? A sample code snippet is below. while(true) { val res = R.groupBy[VertexId](G).flatMap(F) res.collect.foreach(func) if(criteria) break } Since the groups remain the same, what is the best way to go about implementing the above logic?
Re: Not able to update collections
I am a beginner to Scala/Spark. Could you please elaborate on how to make RDD of results of func() and collect? On Tue, Feb 24, 2015 at 2:27 PM, Sean Owen so...@cloudera.com wrote: They aren't the same 'lst'. One is on your driver. It gets copied to executors when the tasks are executed. Those copies are updated. But the updates will never reflect in the local copy back in the driver. You may just wish to make an RDD of the results of func() and collect() them back to the driver. On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote: I am working on the below piece of code. var lst = scala.collection.mutable.MutableList[VertexId]() graph.edges.groupBy[VertexId](f).foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } println(lst.length) Here, the final println() always says that the length of the list is 0. The list is non-empty (correctly prints the length of the returned list inside func()). I am not sure if I am doing the append correctly. Can someone point out what I am doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Not able to update collections
Thanks, but it still doesn't seem to work. Below is my entire code. var mp = scala.collection.mutable.Map[VertexId, Int]() var myRdd = graph.edges.groupBy[VertexId](f).flatMap { edgesBySrc = func(edgesBySrc, a, b) } myRdd.foreach { node = { mp(node) = 1 } } Values in mp do not get updated for any element in myRdd. On Tue, Feb 24, 2015 at 2:39 PM, Sean Owen so...@cloudera.com wrote: Instead of ...foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } try ...flatMap { edgesBySrc = func(edgesBySrc) } or even more succinctly ...flatMap(func) This returns an RDD that basically has the list you are trying to build, I believe. You can collect() to the driver but beware if it is a huge data set. If you really just mean to count the results, you can count() instead On Tue, Feb 24, 2015 at 7:35 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: I am a beginner to Scala/Spark. Could you please elaborate on how to make RDD of results of func() and collect? On Tue, Feb 24, 2015 at 2:27 PM, Sean Owen so...@cloudera.com wrote: They aren't the same 'lst'. One is on your driver. It gets copied to executors when the tasks are executed. Those copies are updated. But the updates will never reflect in the local copy back in the driver. You may just wish to make an RDD of the results of func() and collect() them back to the driver. On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote: I am working on the below piece of code. var lst = scala.collection.mutable.MutableList[VertexId]() graph.edges.groupBy[VertexId](f).foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } println(lst.length) Here, the final println() always says that the length of the list is 0. The list is non-empty (correctly prints the length of the returned list inside func()). I am not sure if I am doing the append correctly. Can someone point out what I am doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD groupBy
You are right. I was looking at the wrong logs. I ran it on my local machine and saw that the println actually wrote the vertexIds. I was then able to find the same in the executors' logs in the remote machine. Thanks for the clarification. On Mon, Feb 23, 2015 at 2:00 PM, Sean Owen so...@cloudera.com wrote: Here, println isn't happening on the driver. Are you sure you are looking at the right machine's logs? Yes this may be parallelized over many machines. On Mon, Feb 23, 2015 at 6:37 PM, kvvt kvi...@vt.edu wrote: In the snippet below, graph.edges.groupBy[VertexId](f1).foreach { edgesBySrc = { f2(edgesBySrc).foreach { vertexId = { *println(vertexId)* } } } } f1 is a function that determines how to group the edges (in my case it groups by source vertex) f2 is another function that does some computation on the edges. It returns an iterable (Iterable[VertexId]). *Questions:* 1. The problem is that println(vertexId) doesn't printing anything. I have made sure that f2 doesn't return an empty iterable. I am not sure what I am missing here. 2. I am assuming that f2 is called for each group in parallel. Is this correct? If not, what is the correct way to operate on each group in parallel? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-groupBy-tp21773.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Processing graphs
Hi, Thanks for your reply. I basically want to check if my understanding what parallelize() on RDDs is correct. In my case, I create a vertex RDD and edge RDD and distribute them by calling parallelize(). Now does Spark perform any operation on these RDDs in parallel? For example, if I apply groupBy on the edge RDD (grouping by source vertex) and call a function F on the grouped RDD, will F be applied on each group in parallel and will Spark determine how to do this in parallel regardless of the number of groups? Thanks. On Tue, Feb 17, 2015 at 5:03 PM, Yifan LI iamyifa...@gmail.com wrote: Hi Kannan, I am not sure I have understood what your question is exactly, but maybe the reduceByKey or reduceByKeyLocally functionality is better to your need. Best, Yifan LI On 17 Feb 2015, at 17:37, Vijayasarathy Kannan kvi...@vt.edu wrote: Hi, I am working on a Spark application that processes graphs and I am trying to do the following. - group the vertices (key - vertex, value - set of its outgoing edges) - distribute each key to separate processes and process them (like mapper) - reduce the results back at the main process Does the groupBy functionality do the distribution by default? Do we have to explicitly use RDDs to enable automatic distribution? It'd be great if you could help me understand these and how to go about with the problem. Thanks.
Processing graphs
Hi, I am working on a Spark application that processes graphs and I am trying to do the following. - group the vertices (key - vertex, value - set of its outgoing edges) - distribute each key to separate processes and process them (like mapper) - reduce the results back at the main process Does the groupBy functionality do the distribution by default? Do we have to explicitly use RDDs to enable automatic distribution? It'd be great if you could help me understand these and how to go about with the problem. Thanks.