Hello everyone,

I am creating a graph from a `gz` compressed `json` file of `edge` and 
`vertices` type.

I have put the files in a dropbox folder [here][1]

I load and map these `json` records to create the `vertices` and `edge` types 
required by `graphx` like this:

    val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
    val vertices = vertices_raw.rdd.map(row=> 
((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))
    val verticesRDD: RDD[(VertexId, Long)] = vertices
    val edges_raw = sqlContext.read.json("path/edges.json.gz")
    val edgesRDD = 
edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong,
 row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, 
row.getAs[Double]("length"))))
    val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, 
edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)
    
I then use this `dijkstra` implementation I found to compute a shortest path 
between two vertices:
    
    def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
              var g2 = g.mapVertices(
            (vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, 
List[VertexId]())
              )
              for (i <- 1L to g.vertices.count - 1) {
                val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
                  .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
                    (a, b) => if (a._2._2 < b._2._2) a else b)
                  ._1
    
                val newDistances: VertexRDD[(Double, List[VertexId])] =
                  g2.aggregateMessages[(Double, List[VertexId])](
                ctx => if (ctx.srcId == currentVertexId) {
                  ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ 
ctx.srcId))
                },
                (a, b) => if (a._1 < b._1) a else b
              )
            g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
              val newSumVal = newSum.getOrElse((Double.MaxValue, 
List[VertexId]()))
              (
                vd._1 || vid == currentVertexId,
                math.min(vd._2, newSumVal._1),
                if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
                )
            })
            }
    
              g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
            (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
              .productIterator.toList.tail
              ))
            }

I take two random vertex id's:

    val v1 = 4000000028222916L
    val v2 = 4000000031019012L
    
and compute the path between them:
    
    val results = dijkstra(my_graph, v1).vertices.map(_._2).collect

I am unable to compute this locally on my laptop without getting a 
stackoverflow error. I have 8GB RAM and 2.6 GHz Intel Core i5 processor. I can 
see that it is using 3 out of 4 cores available. I can load this graph and 
compute shortest on average around 10 paths per second with the `igraph` 
library in Python on exactly the same graph. Is this an inefficient means of 
computing paths? At scale, on multiple nodes the paths will compute (no 
stackoverflow error) but it is still 30/40seconds per path computation. I must 
be missing something. 

Thanks 

  [1]: https://www.dropbox.com/sh/9ug5ikr6j357q7j/AACDBR9UdM0g_ck_ykB8KXPXa?dl=0

Reply via email to