Hi All,

I am running a customized label propagation using Pregel. After a few 
iterations, the program becomes slow and wastes a lot of time in mapPartitions 
(at GraphImpl.scala:184 or VertexRDD.scala:318, or VertexRDD.scala:323). And 
the amount of shuffle write reaches 15GB, while the size of the raw data with 
(srcid, dstid, weight) is only 30MB.


I wonder whether this is normal?


Below please find my customized label propagation implementation. I have 
changed "map" into "mapValues in line 183 to decrease shuffling, but it makes 
no difference":


"
154     def adsorption(sc : SparkContext, graph : Graph[(Int, Map[VertexId, 
Double], String), Double], tol: Double)
155     : Graph[(Int, Map[VertexId, Double], Double, Int, String), Double] =
156     {
157         val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double, 
Int, String), Double] = graph
158         .outerJoinVertices(graph.inDegrees){
159             case (vid, u, inDegOpt) => (u._1, u._2, 1.0, 
inDegOpt.getOrElse(0), u._3)
160         }
162         .cache()
167 
168         def sendMessage(edge : EdgeTriplet[(Int, Map[VertexId, Double], 
Double, Int, String), Double])={
175             val dstAttr = edge.dstAttr
176 
177             if (dstAttr._3 >= tol && (dstAttr._1 == 1 || dstAttr._1 == 3))
178             {
181                 val indegree = dstAttr._4.toDouble
182 
183                 val mapToSend = 
edge.srcAttr._2.mapValues{_/indegree}.map(identity)
187                 Iterator((edge.dstId, mapToSend))
188             }
189             else
190             {       
192                 Iterator.empty
193             }
194         }
195 
196         def mergeMessage(label1:Map[VertexId, Double], label2:Map[VertexId, 
Double]): Map[VertexId, Double] =
197         {  
202             val mm = (label1.keySet ++ label2.keySet).map{i=>
203                 val count1Val = label1.getOrElse(i, 0.0)
204                 val count2Val = label2.getOrElse(i, 0.0)
205                 i->(count1Val + count2Val)
206             }.toMap
211         }
212 
213         def vertexProgram(vid: VertexId, attr: (Int, Map[VertexId, Double], 
Double, Int, String), message: Map[VertexId, Double])={
218 
219            if (message.isEmpty) attr
220            else
221            {
223 
224                val oldlabel = attr._2
227 
228                var accum = 0.0
229 
230                message.foreach(x=> (accum = accum + x._2))
233 
234                val newlabel = message.map(x=>(x._1->x._2/accum))
235 
236                val diff = 
(newlabel.keySet--oldlabel.keySet).toSet.size.toDouble / 
oldlabel.keySet.size.toDouble
239 
240                (attr._1, newlabel, diff, attr._4, attr._5)
241            }
242         }
243 
244         // empty initial message
245         val initialMessage = Map[VertexId, Double]()
246 
247         Pregel(adsorptionGraph, initialMessage, maxIterations = 3, 
activeDirection = EdgeDirection.In)(
248             vprog = vertexProgram,
249             sendMsg = sendMessage,
250             mergeMsg = mergeMessage
251         )
252     }
"

Reply via email to