Re: Shortest path performance in Graphx with Spark

2017-01-11 Thread Irving Duran
Hi Gerard,
How are you starting spark? Are you allocating enough RAM for processing? I
think the default is 512mb.  Try to doing the following and see if it helps
(based on the size of your dataset, you might not need all 8gb).

$SPARK_HOME/bin/spark-shell \
  --master local[4] \
  --executor-memory 8G \
  --driver-memory 8G



Thank You,

Irving Duran

On Tue, Jan 10, 2017 at 12:20 PM, Gerard Casey 
wrote:

> Hello everyone,
>
> I am creating a graph from a `gz` compressed `json` file of `edge` and
> `vertices` type.
>
> I have put the files in a dropbox folder [here][1]
>
> I load and map these `json` records to create the `vertices` and `edge`
> types required by `graphx` like this:
>
> val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
> val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").
> stripPrefix("osgb").toLong),row.getAs[Long]("index")))
> val verticesRDD: RDD[(VertexId, Long)] = vertices
> val edges_raw = sqlContext.read.json("path/edges.json.gz")
> val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("
> positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("
> negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"
> val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD,
> edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)
>
> I then use this `dijkstra` implementation I found to compute a shortest
> path between two vertices:
>
> def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
>   var g2 = g.mapVertices(
> (vid, vd) => (false, if (vid == origin) 0 else
> Double.MaxValue, List[VertexId]())
>   )
>   for (i <- 1L to g.vertices.count - 1) {
> val currentVertexId: VertexId =
> g2.vertices.filter(!_._2._1)
>   .fold((0L, (false, Double.MaxValue, List[VertexId]((
> (a, b) => if (a._2._2 < b._2._2) a else b)
>   ._1
>
> val newDistances: VertexRDD[(Double, List[VertexId])] =
>   g2.aggregateMessages[(Double, List[VertexId])](
> ctx => if (ctx.srcId == currentVertexId) {
>   ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3
> :+ ctx.srcId))
> },
> (a, b) => if (a._1 < b._1) a else b
>   )
> g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
>   val newSumVal = newSum.getOrElse((Double.MaxValue,
> List[VertexId]()))
>   (
> vd._1 || vid == currentVertexId,
> math.min(vd._2, newSumVal._1),
> if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
> )
> })
> }
>
>   g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
> (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
>   .productIterator.toList.tail
>   ))
> }
>
> I take two random vertex id's:
>
> val v1 = 400028222916L
> val v2 = 400031019012L
>
> and compute the path between them:
>
> val results = dijkstra(my_graph, v1).vertices.map(_._2).collect
>
> I am unable to compute this locally on my laptop without getting a
> stackoverflow error. I have 8GB RAM and 2.6 GHz Intel Core i5 processor. I
> can see that it is using 3 out of 4 cores available. I can load this graph
> and compute shortest on average around 10 paths per second with the
> `igraph` library in Python on exactly the same graph. Is this an
> inefficient means of computing paths? At scale, on multiple nodes the paths
> will compute (no stackoverflow error) but it is still 30/40seconds per path
> computation. I must be missing something.
>
> Thanks
>
>   [1]: https://www.dropbox.com/sh/9ug5ikr6j357q7j/AACDBR9UdM0g_
> ck_ykB8KXPXa?dl=0
>


Shortest path performance in Graphx with Spark

2017-01-10 Thread Gerard Casey
Hello everyone,

I am creating a graph from a `gz` compressed `json` file of `edge` and 
`vertices` type.

I have put the files in a dropbox folder [here][1]

I load and map these `json` records to create the `vertices` and `edge` types 
required by `graphx` like this:

val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
val vertices = vertices_raw.rdd.map(row=> 
((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))
val verticesRDD: RDD[(VertexId, Long)] = vertices
val edges_raw = sqlContext.read.json("path/edges.json.gz")
val edgesRDD = 
edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong,
 row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, 
row.getAs[Double]("length"
val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, 
edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)

I then use this `dijkstra` implementation I found to compute a shortest path 
between two vertices:

def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
  var g2 = g.mapVertices(
(vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, 
List[VertexId]())
  )
  for (i <- 1L to g.vertices.count - 1) {
val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
  .fold((0L, (false, Double.MaxValue, List[VertexId]((
(a, b) => if (a._2._2 < b._2._2) a else b)
  ._1

val newDistances: VertexRDD[(Double, List[VertexId])] =
  g2.aggregateMessages[(Double, List[VertexId])](
ctx => if (ctx.srcId == currentVertexId) {
  ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ 
ctx.srcId))
},
(a, b) => if (a._1 < b._1) a else b
  )
g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
  val newSumVal = newSum.getOrElse((Double.MaxValue, 
List[VertexId]()))
  (
vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
)
})
}

  g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
  .productIterator.toList.tail
  ))
}

I take two random vertex id's:

val v1 = 400028222916L
val v2 = 400031019012L

and compute the path between them:

val results = dijkstra(my_graph, v1).vertices.map(_._2).collect

I am unable to compute this locally on my laptop without getting a 
stackoverflow error. I have 8GB RAM and 2.6 GHz Intel Core i5 processor. I can 
see that it is using 3 out of 4 cores available. I can load this graph and 
compute shortest on average around 10 paths per second with the `igraph` 
library in Python on exactly the same graph. Is this an inefficient means of 
computing paths? At scale, on multiple nodes the paths will compute (no 
stackoverflow error) but it is still 30/40seconds per path computation. I must 
be missing something. 

Thanks 

  [1]: https://www.dropbox.com/sh/9ug5ikr6j357q7j/AACDBR9UdM0g_ck_ykB8KXPXa?dl=0