Hi Gerard,
How are you starting spark? Are you allocating enough RAM for processing? I
think the default is 512mb. Try to doing the following and see if it helps
(based on the size of your dataset, you might not need all 8gb).
$SPARK_HOME/bin/spark-shell \
--master local[4] \
--executor-memory 8G \
--driver-memory 8G
Thank You,
Irving Duran
On Tue, Jan 10, 2017 at 12:20 PM, Gerard Casey
wrote:
> Hello everyone,
>
> I am creating a graph from a `gz` compressed `json` file of `edge` and
> `vertices` type.
>
> I have put the files in a dropbox folder [here][1]
>
> I load and map these `json` records to create the `vertices` and `edge`
> types required by `graphx` like this:
>
> val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
> val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").
> stripPrefix("osgb").toLong),row.getAs[Long]("index")))
> val verticesRDD: RDD[(VertexId, Long)] = vertices
> val edges_raw = sqlContext.read.json("path/edges.json.gz")
> val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("
> positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("
> negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"
> val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD,
> edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)
>
> I then use this `dijkstra` implementation I found to compute a shortest
> path between two vertices:
>
> def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
> var g2 = g.mapVertices(
> (vid, vd) => (false, if (vid == origin) 0 else
> Double.MaxValue, List[VertexId]())
> )
> for (i <- 1L to g.vertices.count - 1) {
> val currentVertexId: VertexId =
> g2.vertices.filter(!_._2._1)
> .fold((0L, (false, Double.MaxValue, List[VertexId]((
> (a, b) => if (a._2._2 < b._2._2) a else b)
> ._1
>
> val newDistances: VertexRDD[(Double, List[VertexId])] =
> g2.aggregateMessages[(Double, List[VertexId])](
> ctx => if (ctx.srcId == currentVertexId) {
> ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3
> :+ ctx.srcId))
> },
> (a, b) => if (a._1 < b._1) a else b
> )
> g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
> val newSumVal = newSum.getOrElse((Double.MaxValue,
> List[VertexId]()))
> (
> vd._1 || vid == currentVertexId,
> math.min(vd._2, newSumVal._1),
> if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
> )
> })
> }
>
> g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
> (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
> .productIterator.toList.tail
> ))
> }
>
> I take two random vertex id's:
>
> val v1 = 400028222916L
> val v2 = 400031019012L
>
> and compute the path between them:
>
> val results = dijkstra(my_graph, v1).vertices.map(_._2).collect
>
> I am unable to compute this locally on my laptop without getting a
> stackoverflow error. I have 8GB RAM and 2.6 GHz Intel Core i5 processor. I
> can see that it is using 3 out of 4 cores available. I can load this graph
> and compute shortest on average around 10 paths per second with the
> `igraph` library in Python on exactly the same graph. Is this an
> inefficient means of computing paths? At scale, on multiple nodes the paths
> will compute (no stackoverflow error) but it is still 30/40seconds per path
> computation. I must be missing something.
>
> Thanks
>
> [1]: https://www.dropbox.com/sh/9ug5ikr6j357q7j/AACDBR9UdM0g_
> ck_ykB8KXPXa?dl=0
>