Re: How to set persistence level of graph in GraphX in spark 1.0.0
Hi Yifan LI, I am currently working on Spark 1.0 in which we can't pass edgeStorageLevel as parameter. It implicitly caches the edges. So I am looking for a workaround. http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.graphx.GraphLoader$ Regards, Arpit On Tue, Oct 28, 2014 at 4:25 PM, Yifan LI iamyifa...@gmail.com wrote: Hi Arpit, To try this: val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, vertexStorageLevel = StorageLevel.MEMORY_AND_DISK) Best, Yifan LI On 28 Oct 2014, at 11:17, Arpit Kumar arp8...@gmail.com wrote: Any help regarding this issue please? Regards, Arpit On Sat, Oct 25, 2014 at 8:56 AM, Arpit Kumar arp8...@gmail.com wrote: Hi all, I am using the GrpahLoader class to load graphs from edge list files. But then I need to change the storage level of the graph to some other thing than MEMORY_ONLY. val graph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER) The error I am getting while executing this is: Exception in thread main java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level Then I looked into the GraphLoader class. I know that in the latest version of spark support for setting persistence level is provided in this class. Please suggest a workaround for spark 1.0.0 as I do not have the option to shift to latest release. Note: I tried copying the GraphLoader class to my package as GraphLoader1 importing package com.cloudera.xyz import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl._ and then changing the persistence level to my suitability as .persist(gStorageLevel) instead of .cache() But while compiling I am getting the following errors GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl cannot be accessed in package org.apache.spark.graphx.impl [INFO] val builder = new EdgePartitionBuilder[Int, Int] I am also attaching the file with the mail. Maybe this way of doing thing is not possible. Please suggest some workarounds so that I can set persistence level of my graph to MEMORY_AND_DISK_SER for the graph I read from edge file list -- Arpit Kumar Fourth Year Undergraduate Department of Computer Science and Engineering Indian Institute of Technology, Kharagpur -- Arpit Kumar Fourth Year Undergraduate Department of Computer Science and Engineering Indian Institute of Technology, Kharagpur
Re: Workaround for SPARK-1931 not compiling
Thanks a lot. Now it is working properly. On Sat, Oct 25, 2014 at 2:13 AM, Ankur Dave ankurd...@gmail.com wrote: At 2014-10-23 09:48:55 +0530, Arpit Kumar arp8...@gmail.com wrote: error: value partitionBy is not a member of org.apache.spark.rdd.RDD[(org.apache.spark.graphx.PartitionID, org.apache.spark.graphx.Edge[ED])] Since partitionBy is a member of PairRDDFunctions, it sounds like the implicit conversion from RDD to PairRDDFunctions is not getting applied. Does it help to import org.apache.spark.SparkContext._ before applying the workaround? Ankur -- Arpit Kumar Fourth Year Undergraduate Department of Computer Science and Engineering Indian Institute of Technology, Kharagpur
How to set persistence level of graph in GraphX in spark 1.0.0
Hi all, I am using the GrpahLoader class to load graphs from edge list files. But then I need to change the storage level of the graph to some other thing than MEMORY_ONLY. val graph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER) The error I am getting while executing this is: Exception in thread main java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level Then I looked into the GraphLoader class. I know that in the latest version of spark support for setting persistence level is provided in this class. Please suggest a workaround for spark 1.0.0 as I do not have the option to shift to latest release. Note: I tried copying the GraphLoader class to my package as GraphLoader1 importing package com.cloudera.xyz import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl._ and then changing the persistence level to my suitability as .persist(gStorageLevel) instead of .cache() But while compiling I am getting the following errors GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl cannot be accessed in package org.apache.spark.graphx.impl [INFO] val builder = new EdgePartitionBuilder[Int, Int] I am also attaching the file with the mail. Maybe this way of doing thing is not possible. Please suggest some workarounds so that I can set persistence level of my graph to MEMORY_AND_DISK_SER for the graph I read from edge file list package com.cloudera.sparkwordcount import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl._ /** * Provides utilities for loading [[Graph]]s from files. */ object GraphLoader1 extends Logging { /** * Loads a graph from an edge list formatted file where each line contains two integers: a source * id and a target id. Skips lines that begin with `#`. * * If desired the edges can be automatically oriented in the positive * direction (source Id target Id) by setting `canonicalOrientation` to * true. * * @example Loads a file in the following format: * {{{ * # Comment Line * # Source Id \t Target Id * 1 -5 * 12 * 27 * 18 * }}} * * @param sc SparkContext * @param path the path to the file (e.g., /home/data/file or hdfs://file) * @param canonicalOrientation whether to orient edges in the positive *direction * @param minEdgePartitions the number of partitions for the edge RDD */ def edgeListFile( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, minEdgePartitions: Int = 1) : Graph[Int, Int] = { val startTime = System.currentTimeMillis val gStorageLevel = StorageLevel.MEMORY_AND_DISK_SER // Parse the edge data table directly into edge partitions val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions) val edges = lines.mapPartitionsWithIndex { (pid, iter) = val builder = new EdgePartitionBuilder[Int, Int] iter.foreach { line = if (!line.isEmpty line(0) != '#') { val lineArray = line.split(\\s+) if (lineArray.length 2) { logWarning(Invalid line: + line) } val srcId = lineArray(0).toLong val dstId = lineArray(1).toLong if (canonicalOrientation srcId dstId) { builder.add(dstId, srcId, 1) } else { builder.add(srcId, dstId, 1) } } } Iterator((pid, builder.toEdgePartition)) }.persist(gStorageLevel).setName(GraphLoader.edgeListFile - edges (%s).format(path)) edges.count() logInfo(It took %d ms to load the edges.format(System.currentTimeMillis - startTime)) GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) } // end of edgeListFile } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Workaround for SPARK-1931 not compiling
Hi all, I am new to spark/graphx and am trying to use partitioning strategies in graphx on spark 1.0.0 The workaround I saw on the main page seems not to compile. The code I added was def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { val numPartitions = edges.partitions.size edges.map(e = (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) .partitionBy(new HashPartitioner(numPartitions)) .mapPartitions(_.map(_._2), preservesPartitioning = true) } val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).cache() val graph = Graph(unpartitionedGraph.vertices, partitionBy(unpartitionedGraph.edges, PartitionStrategy.EdgePartition2D)) The partition by function is the same as the workarounds described in the official documentation I am however getting the following error error: value partitionBy is not a member of org.apache.spark.rdd.RDD[(org.apache.spark.graphx.PartitionID, org.apache.spark.graphx.Edge[ED])] [INFO] possible cause: maybe a semicolon is missing before `value partitionBy'? [INFO] .partitionBy(new HashPartitioner(numPartitions)) [INFO] ^ Please help me in resolving the error. Note: I cant upgrade spark since I am only a client on the spark cluster. -- Arpit Kumar