Hi!

tldr; We're looking at potentially using Spark+GraphX to compute PageRank
over a 4 billion node + 128 billion edge graph on a regular (monthly)
basis, possibly growing larger in size over time. If anyone has hints /
tips / upcoming optimizations I should test out (or wants to contribute --
we'll pay the EC2 credits!) for running large scale graph processing with
Spark+GraphX on EC2+S3, I'd love to hear it! =]

First, I must say, I'm quite excited for the rise in the Spark ecosystem --
Spark makes life so much easier -- and it's for that very reason I'm
looking to use it for some of our processing work at the tech non-profit
CommonCrawl <http://commoncrawl.org/>. To improve our crawl, we're aiming
to run PageRank monthly on our crawl archives. As we can run the whole
pipeline in Spark+GraphX, it's really quite a tempting proposition for us.

For that reason, I've been looking at replicating the existing experiments
from the GraphX: Unifying Data-Parallel and Graph-Parallel Analytics
<http://arxiv.org/abs/1402.2394> paper to make sure my general setup and
experimental methodology are sound before attempting to scale up to the
larger dataset. One issue is that the paper states the hardware, but not
the convergence tolerance or number of iterations for PageRank. I've read a
separate figure in a Spark presentation
<http://www.graphanalysis.org/IPDPS2014-workshop/Gonzales.pdf> that reports
68 seconds for 10 iterations, but no clue if 10 iterations are comparable.
I'm using the Spark EC2 spin up scripts, and other than some minor issues
such as Ganglia failing[1], getting a cluster running has been positive and
smooth sailing.

*Hardware:* All experiments were done with either 16 m2.4xlarge nodes or 32
r3.xlarge machines. That has comparable RAM and CPU to the machines used in
the paper whilst also having SSD instead of magnetic disk. I've found more
machines can work better for downloading large amounts of data from S3,
where our "target" dataset will be stored.



*Experiments (30 iterations of PageRank, taking away data loading time)*

*LiveJournal:*Total runtime: [4:19|4:32|5:16|...]
Loading: [1:36|1:48|...]
Per iteration: 6 seconds

*Experiments (Twitter - 41 million nodes, 1.4 billion edges):*
Total runtime: 28 minutes
Loading: 4.3 minutes
Per iteration: 47 seconds
(Special note: with memory+disk serialization and compressed RDD [snappy],
it's ~100 seconds per iteration (54 minutes in total with 4.5 minutes
loading))

*Experiments (WebDataCommons PLD -- 43 million nodes, 623 million edges):*
This should be on a similar scale to the Twitter graph (41 million nodes,
1.4 billion edges) and it's a smaller representation of what I'm aiming to
tackle.
Data freely available here (2.7GB):
http://webdatacommons.org/hyperlinkgraph/#toc2

Total runtime: 45 minutes
Loading: 25 minutes (silly single file -- didn't split it beforehand plus
format was gzip and not bzip2 so wasn't splittable)
Per iteration: 40 seconds

*20 / 697 of WebDataCommons Page Graph (4 billion nodes, 128 billion edges)*
Data freely available here (331GB for the full dataset):
http://webdatacommons.org/hyperlinkgraph/#toc2

Vertices: 157155245, Edges: 3611537984
Consistent "java.lang.Long cannot be cast to scala.Tuple2" class cast
exceptions [2] produced by the SortShuffleWriter / ExternalSorter but it's
slowly getting done - primarily as some of the partitions don't seem to
need the external sort or don't trigger the error. Potentially due to using
SORT instead of HASH, but I've not tested that yet.
Per iteration: 5-6 minutes (didn't complete the full job)

*Both 50 / 697 and 100 / 697 experiment (failed)*
50 / 697 has "Vertices: 326314372, Edges: 9072089327"
100 / 697 couldn't report the graph size

Java heap space exceptions everywhere :P
OpenHashSet in EdgePartitionBuilder
<https://github.com/amplab/graphx/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala#L55>
seems
exceeded -- possibly as there could still be millions or even billions of
unique nodes in a single partition.

*Code:*
I've put the code up on GitHub under the terrible name graphx-prank
<https://github.com/Smerity/graphx-prank>.

The given code allows me to run large graphs locally on my laptop for
testing, whereas the GraphX PageRank example tends to fail at the memory
intensive parts. I can run LiveJournal on my underpowered laptop using my
code, for example, but not using the GraphX PageRank example.

The aim is for the code to be used as part of a pipeline at Common Crawl
for extracting a hyperlink graph, computing PageRank over it, then storing
the results to determine what we crawl next.

*General questions and comments:*

(a) Is this size of graph sane to use with GraphX yet with 50 m2.4xlarge
nodes or 100 r3.xlarge machines? I know optimizations for insanely large
graphs are coming in but I'm potentially a little early?
(b) Any improvements / optimizations for running Spark and GraphX at this
scale?
(c) Are there any other example scripts in the wild, especially of good
techniques or optimized Spark/GraphX usage? I'm new to both Scala, Spark,
and GraphX, so I'm always looking for good resources.

[1]: I think it's related to *Starting httpd: httpd: Syntax error on line
153 of /etc/httpd/conf/httpd.conf: Cannot load modules/mod_authn_alias.so
into server: /etc/httpd/modules/mod_authn_alias.so: cannot open shared
object file: No such file or directory *but I've not spent time
investigating as Ganglia is less of a concern for me right now.

[2]: java.lang.ClassCastException: java.lang.Long cannot be cast to
scala.Tuple2

org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39)

org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)

org.apache.spark.util.collection.ExternalSorter.spillToMergeableFile(ExternalSorter.scala:350)

org.apache.spark.util.collection.ExternalSorter.spill(ExternalSorter.scala:285)

org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:262)

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:233)

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:74)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

-- 
Regards,
Stephen Merity
Data Scientist @ Common Crawl

Reply via email to