Hi All,
I am running a customized label propagation using Pregel. After a few iterations, the program becomes slow and wastes a lot of time in mapPartitions (at GraphImpl.scala:184 or VertexRDD.scala:318, or VertexRDD.scala:323). And the amount of shuffle write reaches 15GB, while the size of the raw data with (srcid, dstid, weight) is only 30MB. I wonder whether this is normal? Below please find my customized label propagation implementation. I have changed "map" into "mapValues in line 183 to decrease shuffling, but it makes no difference": " 154 def adsorption(sc : SparkContext, graph : Graph[(Int, Map[VertexId, Double], String), Double], tol: Double) 155 : Graph[(Int, Map[VertexId, Double], Double, Int, String), Double] = 156 { 157 val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double, Int, String), Double] = graph 158 .outerJoinVertices(graph.inDegrees){ 159 case (vid, u, inDegOpt) => (u._1, u._2, 1.0, inDegOpt.getOrElse(0), u._3) 160 } 162 .cache() 167 168 def sendMessage(edge : EdgeTriplet[(Int, Map[VertexId, Double], Double, Int, String), Double])={ 175 val dstAttr = edge.dstAttr 176 177 if (dstAttr._3 >= tol && (dstAttr._1 == 1 || dstAttr._1 == 3)) 178 { 181 val indegree = dstAttr._4.toDouble 182 183 val mapToSend = edge.srcAttr._2.mapValues{_/indegree}.map(identity) 187 Iterator((edge.dstId, mapToSend)) 188 } 189 else 190 { 192 Iterator.empty 193 } 194 } 195 196 def mergeMessage(label1:Map[VertexId, Double], label2:Map[VertexId, Double]): Map[VertexId, Double] = 197 { 202 val mm = (label1.keySet ++ label2.keySet).map{i=> 203 val count1Val = label1.getOrElse(i, 0.0) 204 val count2Val = label2.getOrElse(i, 0.0) 205 i->(count1Val + count2Val) 206 }.toMap 211 } 212 213 def vertexProgram(vid: VertexId, attr: (Int, Map[VertexId, Double], Double, Int, String), message: Map[VertexId, Double])={ 218 219 if (message.isEmpty) attr 220 else 221 { 223 224 val oldlabel = attr._2 227 228 var accum = 0.0 229 230 message.foreach(x=> (accum = accum + x._2)) 233 234 val newlabel = message.map(x=>(x._1->x._2/accum)) 235 236 val diff = (newlabel.keySet--oldlabel.keySet).toSet.size.toDouble / oldlabel.keySet.size.toDouble 239 240 (attr._1, newlabel, diff, attr._4, attr._5) 241 } 242 } 243 244 // empty initial message 245 val initialMessage = Map[VertexId, Double]() 246 247 Pregel(adsorptionGraph, initialMessage, maxIterations = 3, activeDirection = EdgeDirection.In)( 248 vprog = vertexProgram, 249 sendMsg = sendMessage, 250 mergeMsg = mergeMessage 251 ) 252 } "