RE: GraphX partition problem
Hi Ankur, We’ve built it from the git link you’ve sent, and we don’t get the exception anymore. However, we’ve been facing strange indeterministic behavior from Graphx. We compute connected components on a graph of ~900K edges. We ran the spark job several times on the same input graph and got back different components each time. Furthermore, we construct the graph from an edge list, therefore there should not be “singleton” components. In the output we see that the vast majority (like 80%) of the components have only single vertex. Does that have something to do with the bugfix below? Can you advise on how to solve this issue? Thanks, Alex From: Ankur Dave [mailto:ankurd...@gmail.com] Sent: Thursday, May 22, 2014 6:59 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: GraphX partition problem The fix will be included in Spark 1.0, but if you just want to apply the fix to 0.9.1, here's a hotfixed version of 0.9.1 that only includes PR #367: https://github.com/ankurdave/spark/tree/v0.9.1-handle-empty-partitions. You can clone and build this. Ankurhttp://www.ankurdave.com/ On Thu, May 22, 2014 at 4:53 AM, Zhicharevich, Alex azhicharev...@ebay.commailto:azhicharev...@ebay.com wrote: Hi, I’m running a simple connected components code using GraphX (version 0.9.1) My input comes from a HDFS text file partitioned to 400 parts. When I run the code on a single part or a small number of files (like 20) the code runs fine. As soon as I’m trying to read more files (more than 30) I’m getting an error and the job fails. From looking at the logs I see the following exception java.util.NoSuchElementException: End of stream at org.apache.spark.util.NextIterator.next(NextIterator.scala:83) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) at org.apache.spark.graphx.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:52) at org.apache.spark.graphx.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:51) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:456) From searching the web, I see it’s a known issue with GraphX Here : https://github.com/apache/spark/pull/367 And here : https://github.com/apache/spark/pull/497 Are there some stable releases that include this fix? Should I clone the git repo and build it myself? How would you advise me to deal with this issue Thanks, Alex
Re: GraphX partition problem
I've been trying to reproduce this but I haven't succeeded so far. For example, on the web-Google https://snap.stanford.edu/data/web-Google.htmlgraph, I get the expected results both on v0.9.1-handle-empty-partitions and on master: // Load web-Google and run connected componentsimport org.apache.spark.graphx._val g = GraphLoader.edgeListFile(sc, /Users/ankurdave/Downloads/web-Google.txt, minEdgePartitions=8) g.vertices.count // = 875713val cc = g.connectedComponents.vertices.map(_._2).cache() cc.count // = 875713val counts = cc.countByValue counts.values.sum // = 875713// There should not be any single-vertex components, because we loaded an edge listcounts.count(_._2 == 0) // = 0counts.count(_._2 == 1) // = 0counts.count(_._2 == 2) // = 783counts.count(_._2 == 3) // = 503// The 3 smallest and largest components in the graph (with nondeterministic tiebreaking)counts.toArray.sortBy(_._2).take(3) // = Array((418467,2), (272504,2), (719750,2))counts.toArray.sortBy(_._2).takeRight(3) // = Array((1363,384), (1734,404), (0,855802)) Ankur http://www.ankurdave.com/
RE: GraphX partition problem
I’m not sure about 1.2TB, but I can give it a shot. Is there some way to persist intermediate results to disk? Does all the graph has to be in memory? Alex From: Ankur Dave [mailto:ankurd...@gmail.com] Sent: Monday, May 26, 2014 12:23 AM To: user@spark.apache.org Subject: Re: GraphX partition problem Once the graph is built, edges are stored in parallel primitive arrays, so each edge should only take 20 bytes to store (srcId: Long, dstId: Long, attr: Int). Unfortunately, the current implementation in EdgePartitionBuilder uses an array of Edge objects as an intermediate representation for sorting, so each edge will additionally take something like 40 bytes during graph construction (srcId (8) + dstId (8) + attr (4) + uncompressed pointer (8) + object overhead (8) + padding (4)). So you'll need about 1.2 TB of memory in total (60 bytes * 20 billion). Does your cluster have this much memory? If not, I've been meaning to write a custom sort routine that can operate directly on the three parallel arrays. This will reduce the memory requirements to about 400 GB. Ankurhttp://www.ankurdave.com/
RE: GraphX partition problem
Can we do better with Bagel somehow? Control how we store the graph? From: Ankur Dave [mailto:ankurd...@gmail.com] Sent: Monday, May 26, 2014 12:13 PM To: user@spark.apache.org Subject: Re: GraphX partition problem GraphX only performs sequential scans over the edges, so we could in theory store them on disk and stream through them, but we haven't implemented this yet. In-memory storage is the only option for now. Ankurhttp://www.ankurdave.com/
RE: GraphX partition problem
Thanks Ankurhttp://www.ankurdave.com/, Built it from git and it works great. I have another issue now. I am trying to process a huge graph with about 20 billion edges with GraphX. I only load the file, compute connected components and persist it right back to disk. When working with subgraphs (with ~50M edges) this works well, but on the whole graph it seem to choke on the graph construction part. Can you advise on how to tune spark to process memory parameters for this task. Thanks, Alex From: Ankur Dave [mailto:ankurd...@gmail.com] Sent: Thursday, May 22, 2014 6:59 PM To: user@spark.apache.org Subject: Re: GraphX partition problem The fix will be included in Spark 1.0, but if you just want to apply the fix to 0.9.1, here's a hotfixed version of 0.9.1 that only includes PR #367: https://github.com/ankurdave/spark/tree/v0.9.1-handle-empty-partitions. You can clone and build this. Ankurhttp://www.ankurdave.com/ On Thu, May 22, 2014 at 4:53 AM, Zhicharevich, Alex azhicharev...@ebay.commailto:azhicharev...@ebay.com wrote: Hi, I’m running a simple connected components code using GraphX (version 0.9.1) My input comes from a HDFS text file partitioned to 400 parts. When I run the code on a single part or a small number of files (like 20) the code runs fine. As soon as I’m trying to read more files (more than 30) I’m getting an error and the job fails. From looking at the logs I see the following exception java.util.NoSuchElementException: End of stream at org.apache.spark.util.NextIterator.next(NextIterator.scala:83) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) at org.apache.spark.graphx.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:52) at org.apache.spark.graphx.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:51) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:456) From searching the web, I see it’s a known issue with GraphX Here : https://github.com/apache/spark/pull/367 And here : https://github.com/apache/spark/pull/497 Are there some stable releases that include this fix? Should I clone the git repo and build it myself? How would you advise me to deal with this issue Thanks, Alex
Re: GraphX partition problem
Once the graph is built, edges are stored in parallel primitive arrays, so each edge should only take 20 bytes to store (srcId: Long, dstId: Long, attr: Int). Unfortunately, the current implementation in EdgePartitionBuilder uses an array of Edge objects as an intermediate representation for sorting, so each edge will additionally take something like 40 bytes during graph construction (srcId (8) + dstId (8) + attr (4) + uncompressed pointer (8) + object overhead (8) + padding (4)). So you'll need about 1.2 TB of memory in total (60 bytes * 20 billion). Does your cluster have this much memory? If not, I've been meaning to write a custom sort routine that can operate directly on the three parallel arrays. This will reduce the memory requirements to about 400 GB. Ankur http://www.ankurdave.com/