Re: How to set persistence level of graph in GraphX in spark 1.0.0

2014-10-28 Thread Arpit Kumar
Hi Yifan LI,
I am currently working on Spark 1.0 in which we can't pass edgeStorageLevel
as parameter. It implicitly caches the edges. So I am looking for a
workaround.

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.graphx.GraphLoader$

Regards,
Arpit

On Tue, Oct 28, 2014 at 4:25 PM, Yifan LI iamyifa...@gmail.com wrote:

 Hi Arpit,

 To try this:

 val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions =
 numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
  vertexStorageLevel = StorageLevel.MEMORY_AND_DISK)


 Best,
 Yifan LI




 On 28 Oct 2014, at 11:17, Arpit Kumar arp8...@gmail.com wrote:

 Any help regarding this issue please?

 Regards,
 Arpit

 On Sat, Oct 25, 2014 at 8:56 AM, Arpit Kumar arp8...@gmail.com wrote:

 Hi all,
 I am using the GrpahLoader class to load graphs from edge list files. But
 then I need to change the storage level of the graph to some other thing
 than MEMORY_ONLY.

 val graph = GraphLoader.edgeListFile(sc, fname,
   minEdgePartitions =
 numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER)

 The error I am getting while executing this is:
 Exception in thread main java.lang.UnsupportedOperationException:
 Cannot change storage level of an RDD after it was already assigned a level


 Then I looked into the GraphLoader class. I know that in the latest
 version of spark support for setting persistence level is provided in this
 class. Please suggest a workaround for spark 1.0.0 as I do not have the
 option to shift to latest release.

 Note: I tried copying the GraphLoader class to my package as GraphLoader1
 importing

 package com.cloudera.xyz

 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.graphx._
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.graphx.impl._

 and then changing the persistence level to my suitability as
 .persist(gStorageLevel) instead of .cache()

 But while compiling I am getting the following errors

 GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl
 cannot be accessed in package org.apache.spark.graphx.impl
 [INFO]   val builder = new EdgePartitionBuilder[Int, Int]

 I am also attaching the file with the mail. Maybe this way of doing thing
 is not possible.


 Please suggest some workarounds so that I can set persistence level of my
 graph to MEMORY_AND_DISK_SER for the graph I read from edge file list




 --
 Arpit Kumar
 Fourth Year Undergraduate
 Department of Computer Science and Engineering
 Indian Institute of Technology, Kharagpur





-- 
Arpit Kumar
Fourth Year Undergraduate
Department of Computer Science and Engineering
Indian Institute of Technology, Kharagpur


Re: Workaround for SPARK-1931 not compiling

2014-10-24 Thread Arpit Kumar
Thanks a lot. Now it is working properly.

On Sat, Oct 25, 2014 at 2:13 AM, Ankur Dave ankurd...@gmail.com wrote:

 At 2014-10-23 09:48:55 +0530, Arpit Kumar arp8...@gmail.com wrote:
  error: value partitionBy is not a member of
  org.apache.spark.rdd.RDD[(org.apache.spark.graphx.PartitionID,
  org.apache.spark.graphx.Edge[ED])]

 Since partitionBy is a member of PairRDDFunctions, it sounds like the
 implicit conversion from RDD to PairRDDFunctions is not getting applied.
 Does it help to import org.apache.spark.SparkContext._ before applying
 the workaround?

 Ankur




-- 
Arpit Kumar
Fourth Year Undergraduate
Department of Computer Science and Engineering
Indian Institute of Technology, Kharagpur


How to set persistence level of graph in GraphX in spark 1.0.0

2014-10-24 Thread Arpit Kumar
Hi all,
I am using the GrpahLoader class to load graphs from edge list files. But
then I need to change the storage level of the graph to some other thing
than MEMORY_ONLY.

val graph = GraphLoader.edgeListFile(sc, fname,
  minEdgePartitions =
numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER)

The error I am getting while executing this is:
Exception in thread main java.lang.UnsupportedOperationException: Cannot
change storage level of an RDD after it was already assigned a level


Then I looked into the GraphLoader class. I know that in the latest version
of spark support for setting persistence level is provided in this class.
Please suggest a workaround for spark 1.0.0 as I do not have the option to
shift to latest release.

Note: I tried copying the GraphLoader class to my package as GraphLoader1
importing

package com.cloudera.xyz

import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl._

and then changing the persistence level to my suitability as
.persist(gStorageLevel) instead of .cache()

But while compiling I am getting the following errors

GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl
cannot be accessed in package org.apache.spark.graphx.impl
[INFO]   val builder = new EdgePartitionBuilder[Int, Int]

I am also attaching the file with the mail. Maybe this way of doing thing
is not possible.


Please suggest some workarounds so that I can set persistence level of my
graph to MEMORY_AND_DISK_SER for the graph I read from edge file list
package com.cloudera.sparkwordcount

import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl._

/**
 * Provides utilities for loading [[Graph]]s from files.
 */
object GraphLoader1 extends Logging {

  /**
   * Loads a graph from an edge list formatted file where each line contains two integers: a source
   * id and a target id. Skips lines that begin with `#`.
   *
   * If desired the edges can be automatically oriented in the positive
   * direction (source Id  target Id) by setting `canonicalOrientation` to
   * true.
   *
   * @example Loads a file in the following format:
   * {{{
   * # Comment Line
   * # Source Id \t Target Id
   * 1   -5
   * 12
   * 27
   * 18
   * }}}
   *
   * @param sc SparkContext
   * @param path the path to the file (e.g., /home/data/file or hdfs://file)
   * @param canonicalOrientation whether to orient edges in the positive
   *direction
   * @param minEdgePartitions the number of partitions for the edge RDD
   */
  def edgeListFile(
  sc: SparkContext,
  path: String,
  canonicalOrientation: Boolean = false,
  minEdgePartitions: Int = 1)
: Graph[Int, Int] =
  {
val startTime = System.currentTimeMillis
val gStorageLevel = StorageLevel.MEMORY_AND_DISK_SER
// Parse the edge data table directly into edge partitions
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
val edges = lines.mapPartitionsWithIndex { (pid, iter) =
  val builder = new EdgePartitionBuilder[Int, Int]
  iter.foreach { line =
if (!line.isEmpty  line(0) != '#') {
  val lineArray = line.split(\\s+)
  if (lineArray.length  2) {
logWarning(Invalid line:  + line)
  }
  val srcId = lineArray(0).toLong
  val dstId = lineArray(1).toLong
  if (canonicalOrientation  srcId  dstId) {
builder.add(dstId, srcId, 1)
  } else {
builder.add(srcId, dstId, 1)
  }
}
  }
  Iterator((pid, builder.toEdgePartition))
}.persist(gStorageLevel).setName(GraphLoader.edgeListFile - edges (%s).format(path))
edges.count()

logInfo(It took %d ms to load the edges.format(System.currentTimeMillis - startTime))

GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
  } // end of edgeListFile

}
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Workaround for SPARK-1931 not compiling

2014-10-22 Thread Arpit Kumar
Hi all,
I am new to spark/graphx and am trying to use partitioning strategies in
graphx on spark 1.0.0

The workaround I saw on the main page seems not to compile.
The code I added was

def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy:
PartitionStrategy): RDD[Edge[ED]] = {
  val numPartitions = edges.partitions.size
  edges.map(e = (partitionStrategy.getPartition(e.srcId, e.dstId,
numPartitions), e))
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitions(_.map(_._2), preservesPartitioning = true)
}

val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
  minEdgePartitions = numEPart).cache()

val graph = Graph(unpartitionedGraph.vertices,
partitionBy(unpartitionedGraph.edges, PartitionStrategy.EdgePartition2D))


The partition by function is the same as the workarounds described in the
official documentation

I am however getting the following error
error: value partitionBy is not a member of
org.apache.spark.rdd.RDD[(org.apache.spark.graphx.PartitionID,
org.apache.spark.graphx.Edge[ED])]
[INFO] possible cause: maybe a semicolon is missing before `value
partitionBy'?
[INFO] .partitionBy(new HashPartitioner(numPartitions))
[INFO]  ^


Please help me in resolving the error. Note: I cant upgrade spark since I
am only a client on the spark cluster.

-- 
Arpit Kumar