Found the exact issue. If the vertex attribute is a complex object with
mutable objects the edge triplet does not update the new state once already
the vertex attributes are shipped but if the vertex attributes are immutable
objects then there is no issue. below is a code for the same. Just changing
the mutable hashmap to immutable hashmap solves the issues. ( this is not a
fix for the bug, either this limitation should be made aware of the users
are the bug needs to be fixed for immutable objects.)

import org.apache.spark.graphx._
import com.alibaba.fastjson.JSONObject
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.collection.mutable.HashMap


object PregelTest {
  val logger = Logger.getLogger(getClass().getName());
  def run(graph: Graph[HashMap[String, Int], HashMap[String, Int]]):
Graph[HashMap[String, Int], HashMap[String, Int]] = {

    def vProg(v: VertexId, attr: HashMap[String, Int], msg: Integer):
HashMap[String, Int] = {
      var updatedAttr = attr
      
      if (msg < 0) {
        // init message received 
        if (v.equals(0.asInstanceOf[VertexId])) updatedAttr =
attr.+=("LENGTH" -> 0)
        else updatedAttr = attr.+=("LENGTH" -> Integer.MAX_VALUE)
      } else {
        updatedAttr = attr.+=("LENGTH" -> (msg + 1))
      }
      updatedAttr
    }

    def sendMsg(triplet: EdgeTriplet[HashMap[String, Int], HashMap[String,
Int]]): Iterator[(VertexId, Integer)] = {
      val len = triplet.srcAttr.get("LENGTH").get
      // send a msg if last hub is reachable 
      if (len < Integer.MAX_VALUE) Iterator((triplet.dstId, len))
      else Iterator.empty
    }

    def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
      if (msg1 < msg2) msg1 else msg2
    }

    Pregel(graph, new Integer(-1), 3, EdgeDirection.Either)(vProg, sendMsg,
mergeMsg)
  }

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("Pregel Test")
    conf.set("spark.master", "local")
    val sc = new SparkContext(conf)
    val test = new HashMap[String, Int]

    // create a simplest test graph with 3 nodes and 2 edges 
    val vertexList = Array(
      (0.asInstanceOf[VertexId], new HashMap[String, Int]),
      (1.asInstanceOf[VertexId], new HashMap[String, Int]),
      (2.asInstanceOf[VertexId], new HashMap[String, Int]))
    val edgeList = Array(
      Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
HashMap[String, Int]),
      Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
HashMap[String, Int]))

    val vertexRdd = sc.parallelize(vertexList)
    val edgeRdd = sc.parallelize(edgeList)
    val g = Graph[HashMap[String, Int], HashMap[String, Int]](vertexRdd,
edgeRdd)

    // run test code 
    val lpa = run(g)
    lpa.vertices.collect().map(println)
  }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100p28139.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to