Costs of transformations

2015-06-09 Thread Vijayasarathy Kannan
Is it possible bound costs of operations such as flatMap(), collect() based
on the size of RDDs?


Re: Reading large files

2015-05-06 Thread Vijayasarathy Kannan
Thanks.

In both cases, does the driver need to have enough memory to contain the
entire file? How do both these functions work when, for example, the binary
file is 4G and available driver memory is lesser?

On Wed, May 6, 2015 at 1:54 PM, Ulanov, Alexander alexander.ula...@hp.com
wrote:

  SparkContext has two methods for reading binary files: binaryFiles
 (reads multiple binary files into RDD) and binaryRecords (reads separate
 lines of a single binary file into RDD). For example, I have a big binary
 file split into logical parts, so I can use “binaryFiles”. The possible
 problem is that the order of records between parts is not preserved, so I
 have to do sortBy afterwards.



 Alexander



 *From:* Vijayasarathy Kannan [mailto:kvi...@vt.edu]
 *Sent:* Wednesday, May 06, 2015 10:38 AM
 *To:* user@spark.apache.org
 *Subject:* Reading large files



 ​Hi,



 Is there a way to read a large file, in parallel​/distributed way? I have
 a single large binary file which I currently read on the driver program and
 then distribute it to executors (using groupBy(), etc.). I want to know if
 there's a way to make the executors each read a specific/unique portion of
 the file or create RDDs of multiple portions of the file and finally union
 them.



 Thanks.



Reading large files

2015-05-06 Thread Vijayasarathy Kannan
​Hi,

Is there a way to read a large file, in parallel​/distributed way? I have a
single large binary file which I currently read on the driver program and
then distribute it to executors (using groupBy(), etc.). I want to know if
there's a way to make the executors each read a specific/unique portion of
the file or create RDDs of multiple portions of the file and finally union
them.

Thanks.


Spark JVM default memory

2015-05-04 Thread Vijayasarathy Kannan
Starting the master with /sbin/start-master.sh creates a JVM with only
512MB of memory. How to change this default amount of memory?

Thanks,
Vijay


Re: Spark JVM default memory

2015-05-04 Thread Vijayasarathy Kannan
I am not able to access the web UI for some reason. But the logs (being
written while running my application) show that only 385Mb are being
allocated for each executor (or slave nodes) while the executor memory I
set is 16Gb. This 385Mb is not the same for each run either. It looks
random (sometimes 1G, sometimes 512M, etc.)

On Mon, May 4, 2015 at 6:57 PM, Mohammed Guller moham...@glassbeam.com
wrote:

  Did you confirm through the Spark UI how much memory is getting
 allocated to your application on each worker?



 Mohammed



 *From:* Vijayasarathy Kannan [mailto:kvi...@vt.edu]
 *Sent:* Monday, May 4, 2015 3:36 PM
 *To:* Andrew Ash
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark JVM default memory



 I am trying to read in a file (4GB file). I tried setting both
 spark.driver.memory and spark.executor.memory to large values (say
 16GB) but I still get a GC limit exceeded error. Any idea what I am missing?



 On Mon, May 4, 2015 at 5:30 PM, Andrew Ash and...@andrewash.com wrote:

 It's unlikely you need to increase the amount of memory on your master
 node since it does simple bookkeeping.  The majority of the memory pressure
 across a cluster is on executor nodes.



 See the conf/spark-env.sh file for configuring heap sizes, and this
 section in the docs for more information on how to make these changes:
 http://spark.apache.org/docs/latest/configuration.html



 On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:

 Starting the master with /sbin/start-master.sh creates a JVM with only
 512MB of memory. How to change this default amount of memory?



 Thanks,

 Vijay







Re: Spark JVM default memory

2015-05-04 Thread Vijayasarathy Kannan
I am trying to read in a file (4GB file). I tried setting both
spark.driver.memory and spark.executor.memory to large values (say
16GB) but I still get a GC limit exceeded error. Any idea what I am missing?

On Mon, May 4, 2015 at 5:30 PM, Andrew Ash and...@andrewash.com wrote:

 It's unlikely you need to increase the amount of memory on your master
 node since it does simple bookkeeping.  The majority of the memory pressure
 across a cluster is on executor nodes.

 See the conf/spark-env.sh file for configuring heap sizes, and this
 section in the docs for more information on how to make these changes:
 http://spark.apache.org/docs/latest/configuration.html

 On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:

 Starting the master with /sbin/start-master.sh creates a JVM with only
 512MB of memory. How to change this default amount of memory?

 Thanks,
 Vijay





Complexity of transformations in Spark

2015-04-26 Thread Vijayasarathy Kannan
What is the complexity of transformations and actions in Spark, such as
groupBy(), flatMap(), collect(), etc.?

What attributes do we need to factor (such as number of partitions) in
while analyzing codes using these operations?


Error running Spark on Cloudera

2015-04-08 Thread Vijayasarathy Kannan
I am trying to run a Spark application using spark-submit on a cluster
using Cloudera manager. I get the error

Exception in thread main java.io.IOException: Error in creating log
directory: file:/user/spark/applicationHistory//app-20150408094126-0008

Adding the below lines in /etc/spark/conf/spark-defaults.conf wouldn't
resolve it.

*spark.eventLog.dir=/user/spark/applicationHistory*

*spark.eventLog.enabled=true*

Any idea on what is missing?


Re: Reading a large file (binary) into RDD

2015-04-03 Thread Vijayasarathy Kannan
Thanks everyone for the inputs.

I guess I will try out a custom implementation of InputFormat. But I have
no idea where to start. Are there any code examples of this that might help?

On Fri, Apr 3, 2015 at 9:15 AM, Dean Wampler deanwamp...@gmail.com wrote:

 This might be overkill for your needs, but the scodec parser combinator
 library might be useful for creating a parser.

 https://github.com/scodec/scodec

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 2, 2015 at 6:53 PM, java8964 java8...@hotmail.com wrote:

 I think implementing your own InputFormat and using
 SparkContext.hadoopFile() is the best option for your case.

 Yong

 --
 From: kvi...@vt.edu
 Date: Thu, 2 Apr 2015 17:31:30 -0400
 Subject: Re: Reading a large file (binary) into RDD
 To: freeman.jer...@gmail.com
 CC: user@spark.apache.org


 The file has a specific structure. I outline it below.

 The input file is basically a representation of a graph.

 INT
 INT(A)
 LONG (B)
 A INTs(Degrees)
 A SHORTINTs  (Vertex_Attribute)
 B INTs
 B INTs
 B SHORTINTs
 B SHORTINTs

 A - number of vertices
 B - number of edges (note that the INTs/SHORTINTs associated with this
 are edge attributes)

 After reading in the file, I need to create two RDDs (one with vertices
 and the other with edges)

 On Thu, Apr 2, 2015 at 4:46 PM, Jeremy Freeman freeman.jer...@gmail.com
 wrote:

 Hm, that will indeed be trickier because this method assumes records are
 the same byte size. Is the file an arbitrary sequence of mixed types, or is
 there structure, e.g. short, long, short, long, etc.?

 If you could post a gist with an example of the kind of file and how it
 should look once read in that would be useful!

 -
 jeremyfreeman.net
 @thefreemanlab

 On Apr 2, 2015, at 2:09 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 Thanks for the reply. Unfortunately, in my case, the binary file is a mix
 of short and long integers. Is there any other way that could of use here?

 My current method happens to have a large overhead (much more than actual
 computation time). Also, I am short of memory at the driver when it has to
 read the entire file.

 On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com
 wrote:

 If it’s a flat binary file and each record is the same length (in bytes),
 you can use Spark’s binaryRecords method (defined on the SparkContext),
 which loads records from one or more large flat binary files into an RDD.
 Here’s an example in python to show how it works:

 # write data from an array
 from numpy import random
 dat = random.randn(100,5)
 f = open('test.bin', 'w')
 f.write(dat)
 f.close()


 # load the data back in

 from numpy import frombuffer

 nrecords = 5
 bytesize = 8
 recordsize = nrecords * bytesize
 data = sc.binaryRecords('test.bin', recordsize)
 parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float'))


 # these should be equal
 parsed.first()
 dat[0,:]


 Does that help?

 -
 jeremyfreeman.net
 @thefreemanlab

 On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 What are some efficient ways to read a large file into RDDs?

 For example, have several executors read a specific/unique portion of the
 file and construct RDDs. Is this possible to do in Spark?

 Currently, I am doing a line-by-line read of the file at the driver and
 constructing the RDD.









Re: Reading a large file (binary) into RDD

2015-04-02 Thread Vijayasarathy Kannan
Thanks for the reply. Unfortunately, in my case, the binary file is a mix
of short and long integers. Is there any other way that could of use here?

My current method happens to have a large overhead (much more than actual
computation time). Also, I am short of memory at the driver when it has to
read the entire file.

On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com
wrote:

 If it’s a flat binary file and each record is the same length (in bytes),
 you can use Spark’s binaryRecords method (defined on the SparkContext),
 which loads records from one or more large flat binary files into an RDD.
 Here’s an example in python to show how it works:

 # write data from an array
 from numpy import random
 dat = random.randn(100,5)
 f = open('test.bin', 'w')
 f.write(dat)
 f.close()


 # load the data back in

 from numpy import frombuffer

 nrecords = 5
 bytesize = 8
 recordsize = nrecords * bytesize
 data = sc.binaryRecords('test.bin', recordsize)
 parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float'))


 # these should be equal
 parsed.first()
 dat[0,:]


 Does that help?

 -
 jeremyfreeman.net
 @thefreemanlab

 On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 What are some efficient ways to read a large file into RDDs?

 For example, have several executors read a specific/unique portion of the
 file and construct RDDs. Is this possible to do in Spark?

 Currently, I am doing a line-by-line read of the file at the driver and
 constructing the RDD.





Reading a large file (binary) into RDD

2015-04-02 Thread Vijayasarathy Kannan
What are some efficient ways to read a large file into RDDs?

For example, have several executors read a specific/unique portion of the
file and construct RDDs. Is this possible to do in Spark?

Currently, I am doing a line-by-line read of the file at the driver and
constructing the RDD.


Re: Reading a large file (binary) into RDD

2015-04-02 Thread Vijayasarathy Kannan
The file has a specific structure. I outline it below.

The input file is basically a representation of a graph.

INT
INT(A)
LONG (B)
A INTs(Degrees)
A SHORTINTs  (Vertex_Attribute)
B INTs
B INTs
B SHORTINTs
B SHORTINTs

A - number of vertices
B - number of edges (note that the INTs/SHORTINTs associated with this are
edge attributes)

After reading in the file, I need to create two RDDs (one with vertices and
the other with edges)

On Thu, Apr 2, 2015 at 4:46 PM, Jeremy Freeman freeman.jer...@gmail.com
wrote:

 Hm, that will indeed be trickier because this method assumes records are
 the same byte size. Is the file an arbitrary sequence of mixed types, or is
 there structure, e.g. short, long, short, long, etc.?

 If you could post a gist with an example of the kind of file and how it
 should look once read in that would be useful!

 -
 jeremyfreeman.net
 @thefreemanlab

 On Apr 2, 2015, at 2:09 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 Thanks for the reply. Unfortunately, in my case, the binary file is a mix
 of short and long integers. Is there any other way that could of use here?

 My current method happens to have a large overhead (much more than actual
 computation time). Also, I am short of memory at the driver when it has to
 read the entire file.

 On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com
 wrote:

 If it’s a flat binary file and each record is the same length (in bytes),
 you can use Spark’s binaryRecords method (defined on the SparkContext),
 which loads records from one or more large flat binary files into an RDD.
 Here’s an example in python to show how it works:

 # write data from an array
 from numpy import random
 dat = random.randn(100,5)
 f = open('test.bin', 'w')
 f.write(dat)
 f.close()


 # load the data back in

 from numpy import frombuffer

 nrecords = 5
 bytesize = 8
 recordsize = nrecords * bytesize
 data = sc.binaryRecords('test.bin', recordsize)
 parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float'))


 # these should be equal
 parsed.first()
 dat[0,:]


 Does that help?

 -
 jeremyfreeman.net
 @thefreemanlab

 On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 What are some efficient ways to read a large file into RDDs?

 For example, have several executors read a specific/unique portion of the
 file and construct RDDs. Is this possible to do in Spark?

 Currently, I am doing a line-by-line read of the file at the driver and
 constructing the RDD.







Re: Unable to run Spark application

2015-04-01 Thread Vijayasarathy Kannan
That is failing too, with

sbt.resolveexception: unresolved
dependency:org.apache.spark#spark-network-common_2.10;1.2.1



On Wed, Apr 1, 2015 at 1:24 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Try sbt assembly instead.

 On Wed, Apr 1, 2015 at 10:09 AM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:
  Why do I get
  Failed to find Spark assembly JAR.
  You need to build Spark before running this program. ?
 
  I downloaded spark-1.2.1.tgz from the downloads page and extracted it.
  When I do sbt package inside my application, it worked fine. But when I
  try to run my application, I get the above mentioned error.
 
 



 --
 Marcelo



Unable to run Spark application

2015-04-01 Thread Vijayasarathy Kannan
Why do I get
Failed to find Spark assembly JAR.
You need to build Spark before running this program. ?

I downloaded spark-1.2.1.tgz from the downloads page and extracted it.
When I do sbt package inside my application, it worked fine. But when I
try to run my application, I get the above mentioned error.


Re: Unable to run Spark application

2015-04-01 Thread Vijayasarathy Kannan
Managed to make sbt assembly work.

I run into another issue now. When I do ./sbin/start-all.sh, the script
fails saying JAVA_HOME is not set although I have explicitly set that
variable to point to the correct Java location. Same happens with
./sbin/start-master.sh script. Any idea what I might be missing?

On Wed, Apr 1, 2015 at 1:32 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 That is failing too, with

 sbt.resolveexception: unresolved
 dependency:org.apache.spark#spark-network-common_2.10;1.2.1



 On Wed, Apr 1, 2015 at 1:24 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Try sbt assembly instead.

 On Wed, Apr 1, 2015 at 10:09 AM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:
  Why do I get
  Failed to find Spark assembly JAR.
  You need to build Spark before running this program. ?
 
  I downloaded spark-1.2.1.tgz from the downloads page and extracted it.
  When I do sbt package inside my application, it worked fine. But when
 I
  try to run my application, I get the above mentioned error.
 
 



 --
 Marcelo





Problems with spark.akka.frameSize

2015-03-19 Thread Vijayasarathy Kannan
Hi,

I am encountering the following error with a Spark application.

Exception in thread main org.apache.spark.SparkException:
Job aborted due to stage failure:
Serialized task 0:0 was 11257268 bytes, which exceeds max allowed:
spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes).
Consider increasing spark.akka.frameSize or using broadcast variables for
large values.

I am doing the following in my code.

var groupedEdges =
graph.edges.groupBy[VertexId](groupBySrcId).persist(StorageLevel.MEMORY_AND_DISK)

while(condition) {
  var updates = groupedEdges.flatMap {
  edgesBySrc = doWork(edgesBySrc, a, b)
}

  updates.collect.foreach(println)
}

def doWork(edges: (VertexId, Iterable[Edge[(Int, Int, Int)]]), a: Double,
b: Double): List[VertexId] = {
  // do something with edges and return a list of verteices
}

Note that the attribute of each edge is a tuple with 3 elements. I
encountered the above mentioned error only when having the edge attribute
as a tuple. The code doesn't run into this when the edge attribute is a
single integer.

I tried increasing *spark.akka.frameSize* (to say 100) and it worked
without running into this issue. Doing a broadcast does not seem
appropriate because each task performing doWork() gets a different set of
edges. However, the groups of edges remain the same all through.

I was wondering if there is an efficient way to what I'm doing, i.e., pass
edgesBySrc efficiently to doWork() (or not pass it all or pass it just once
for the first time and have the tasks retain the sets of edges across
iterations) ?

Thanks


Issues with SBT and Spark

2015-03-19 Thread Vijayasarathy Kannan
My current simple.sbt is

name := SparkEpiFast

version := 1.0

scalaVersion := 2.11.4

libraryDependencies += org.apache.spark % spark-core_2.11 % 1.2.1 %
provided

libraryDependencies += org.apache.spark % spark-graphx_2.11 % 1.2.1 %
provided

While I do sbt package, it compiles successfully. But while running the
application, I get
Exception in thread main java.lang.NoSuchMethodError:
scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;

However, changing the scala version to 2.10.4 and updating the dependency
lines appropriately resolves the issue (no exception).

Could anyone please point out what I am doing wrong?


Question on RDD groupBy and executors

2015-03-17 Thread Vijayasarathy Kannan
Hi,

I am doing a groupBy on an EdgeRDD like this,

val groupedEdges = graph.edges.groupBy[VertexId](func0)
while(true) {
  val info = groupedEdges.flatMap(func1).collect.foreach(func2)
}

The groupBy distributes the data to different executors on different nodes
in the cluster.

Given a key K (a vertexId identifying a particular group in *groupedEdges*),
is there a way to find details such as
- which executor is responsible for K?
- which node in the cluster the executor containing K resides on?
- access that specific executor (and possibly assign a task) from the
driver?

Thanks.


PRNG in Scala

2015-03-03 Thread Vijayasarathy Kannan
Hi,

What pseudo-random-number generator does scala.util.Random uses?


Re: Iterating on RDDs

2015-02-27 Thread Vijayasarathy Kannan
As you suggested, I tried to save the grouped RDD and persisted it in
memory before the iterations begin. The performance seems to be much better
now.

My previous comment that the run times doubled was from a wrong observation.

Thanks.


On Fri, Feb 27, 2015 at 10:27 AM, Vijayasarathy Kannan kvi...@vt.edu
wrote:

 Thanks.

 I tried persist() on the RDD. The runtimes appear to have doubled now
 (without persist() it was ~7s per iteration and now its ~15s). I am running
 standalone Spark on a 8-core machine.
 Any thoughts on why the increase in runtime?

 On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid iras...@cloudera.com
 wrote:


 val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
 // or whatever persistence makes more sense for you ...
 while(true) {
   val res = grouped.flatMap(F)
   res.collect.foreach(func)
   if(criteria)
  break
 }

 On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:

 Hi,

 I have the following use case.

 (1) I have an RDD of edges of a graph (say R).
 (2) do a groupBy on R (by say source vertex) and call a function F on
 each group.
 (3) collect the results from Fs and do some computation
 (4) repeat the above steps until some criteria is met

 In (2), the groups are always going to be the same (since R is grouped
 by source vertex).

 Question:
 Is R distributed every iteration (when in (2)) or is it distributed only
 once when it is created?

 A sample code snippet is below.

 while(true) {
   val res = R.groupBy[VertexId](G).flatMap(F)
   res.collect.foreach(func)
   if(criteria)
  break
 }

 Since the groups remain the same, what is the best way to go about
 implementing the above logic?






Re: Iterating on RDDs

2015-02-27 Thread Vijayasarathy Kannan
Thanks.

I tried persist() on the RDD. The runtimes appear to have doubled now
(without persist() it was ~7s per iteration and now its ~15s). I am running
standalone Spark on a 8-core machine.
Any thoughts on why the increase in runtime?

On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid iras...@cloudera.com wrote:


 val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
 // or whatever persistence makes more sense for you ...
 while(true) {
   val res = grouped.flatMap(F)
   res.collect.foreach(func)
   if(criteria)
  break
 }

 On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:

 Hi,

 I have the following use case.

 (1) I have an RDD of edges of a graph (say R).
 (2) do a groupBy on R (by say source vertex) and call a function F on
 each group.
 (3) collect the results from Fs and do some computation
 (4) repeat the above steps until some criteria is met

 In (2), the groups are always going to be the same (since R is grouped by
 source vertex).

 Question:
 Is R distributed every iteration (when in (2)) or is it distributed only
 once when it is created?

 A sample code snippet is below.

 while(true) {
   val res = R.groupBy[VertexId](G).flatMap(F)
   res.collect.foreach(func)
   if(criteria)
  break
 }

 Since the groups remain the same, what is the best way to go about
 implementing the above logic?





Iterating on RDDs

2015-02-26 Thread Vijayasarathy Kannan
Hi,

I have the following use case.

(1) I have an RDD of edges of a graph (say R).
(2) do a groupBy on R (by say source vertex) and call a function F on each
group.
(3) collect the results from Fs and do some computation
(4) repeat the above steps until some criteria is met

In (2), the groups are always going to be the same (since R is grouped by
source vertex).

Question:
Is R distributed every iteration (when in (2)) or is it distributed only
once when it is created?

A sample code snippet is below.

while(true) {
  val res = R.groupBy[VertexId](G).flatMap(F)
  res.collect.foreach(func)
  if(criteria)
 break
}

Since the groups remain the same, what is the best way to go about
implementing the above logic?


Re: Not able to update collections

2015-02-24 Thread Vijayasarathy Kannan
I am a beginner to Scala/Spark. Could you please elaborate on how to make
RDD of results of func() and collect?


On Tue, Feb 24, 2015 at 2:27 PM, Sean Owen so...@cloudera.com wrote:

 They aren't the same 'lst'. One is on your driver. It gets copied to
 executors when the tasks are executed. Those copies are updated. But
 the updates will never reflect in the local copy back in the driver.

 You may just wish to make an RDD of the results of func() and
 collect() them back to the driver.

 On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote:
  I am working on the below piece of code.
 
  var lst = scala.collection.mutable.MutableList[VertexId]()
  graph.edges.groupBy[VertexId](f).foreach {
edgesBySrc = {
lst ++= func(edgesBySrc)
}
  }
 
  println(lst.length)
 
  Here, the final println() always says that the length of the list is 0.
 The
  list is non-empty (correctly prints the length of the returned list
 inside
  func()).
 
  I am not sure if I am doing the append correctly. Can someone point out
 what
  I am doing wrong?
 
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Not able to update collections

2015-02-24 Thread Vijayasarathy Kannan
Thanks, but it still doesn't seem to work.

Below is my entire code.

  var mp = scala.collection.mutable.Map[VertexId, Int]()

  var myRdd = graph.edges.groupBy[VertexId](f).flatMap {
 edgesBySrc = func(edgesBySrc, a, b)
  }

  myRdd.foreach {
node = {
mp(node) = 1
}
  }

Values in mp do not get updated for any element in myRdd.

On Tue, Feb 24, 2015 at 2:39 PM, Sean Owen so...@cloudera.com wrote:

 Instead of

 ...foreach {
   edgesBySrc = {
   lst ++= func(edgesBySrc)
   }
 }

 try

 ...flatMap { edgesBySrc = func(edgesBySrc) }

 or even more succinctly

 ...flatMap(func)

 This returns an RDD that basically has the list you are trying to
 build, I believe.

 You can collect() to the driver but beware if it is a huge data set.

 If you really just mean to count the results, you can count() instead

 On Tue, Feb 24, 2015 at 7:35 PM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:
  I am a beginner to Scala/Spark. Could you please elaborate on how to make
  RDD of results of func() and collect?
 
 
  On Tue, Feb 24, 2015 at 2:27 PM, Sean Owen so...@cloudera.com wrote:
 
  They aren't the same 'lst'. One is on your driver. It gets copied to
  executors when the tasks are executed. Those copies are updated. But
  the updates will never reflect in the local copy back in the driver.
 
  You may just wish to make an RDD of the results of func() and
  collect() them back to the driver.
 
  On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote:
   I am working on the below piece of code.
  
   var lst = scala.collection.mutable.MutableList[VertexId]()
   graph.edges.groupBy[VertexId](f).foreach {
 edgesBySrc = {
 lst ++= func(edgesBySrc)
 }
   }
  
   println(lst.length)
  
   Here, the final println() always says that the length of the list is
 0.
   The
   list is non-empty (correctly prints the length of the returned list
   inside
   func()).
  
   I am not sure if I am doing the append correctly. Can someone point
 out
   what
   I am doing wrong?
  
  
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



Re: RDD groupBy

2015-02-23 Thread Vijayasarathy Kannan
You are right. I was looking at the wrong logs. I ran it on my local
machine and saw that the println actually wrote the vertexIds. I was then
able to find the same in the executors' logs in the remote machine.

Thanks for the clarification.

On Mon, Feb 23, 2015 at 2:00 PM, Sean Owen so...@cloudera.com wrote:

 Here, println isn't happening on the driver. Are you sure you are
 looking at the right machine's logs?

 Yes this may be parallelized over many machines.

 On Mon, Feb 23, 2015 at 6:37 PM, kvvt kvi...@vt.edu wrote:
  In the snippet below,
 
  graph.edges.groupBy[VertexId](f1).foreach {
edgesBySrc = {
  f2(edgesBySrc).foreach {
vertexId = {
  *println(vertexId)*
}
  }
}
  }
 
  f1 is a function that determines how to group the edges (in my case it
  groups by source vertex)
  f2 is another function that does some computation on the edges. It
 returns
  an iterable (Iterable[VertexId]).
 
  *Questions:*
 
  1. The problem is that println(vertexId) doesn't printing anything. I
 have
  made sure that f2 doesn't return an empty iterable. I am not sure what
 I
  am missing here.
 
  2. I am assuming that f2 is called for each group in parallel. Is this
  correct? If not, what is the correct way to operate on each group in
  parallel?
 
 
  Thanks!
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-groupBy-tp21773.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Processing graphs

2015-02-18 Thread Vijayasarathy Kannan
Hi,

Thanks for your reply.

I basically want to check if my understanding what parallelize() on RDDs is
correct. In my case, I create a vertex RDD and edge RDD and distribute them
by calling parallelize(). Now does Spark perform any operation on these
RDDs in parallel?

For example, if I apply groupBy on the edge RDD (grouping by source vertex)
and call a function F on the grouped RDD, will F be applied on each group
in parallel and will Spark determine how to do this in parallel regardless
of the number of groups?

Thanks.

On Tue, Feb 17, 2015 at 5:03 PM, Yifan LI iamyifa...@gmail.com wrote:

 Hi Kannan,

 I am not sure I have understood what your question is exactly, but maybe
 the reduceByKey or reduceByKeyLocally functionality is better to your need.



 Best,
 Yifan LI





 On 17 Feb 2015, at 17:37, Vijayasarathy Kannan kvi...@vt.edu wrote:

 Hi,

 I am working on a Spark application that processes graphs and I am trying
 to do the following.

 - group the vertices (key - vertex, value - set of its outgoing edges)
 - distribute each key to separate processes and process them (like mapper)
 - reduce the results back at the main process

 Does the groupBy functionality do the distribution by default?
 Do we have to explicitly use RDDs to enable automatic distribution?

 It'd be great if you could help me understand these and how to go about
 with the problem.

 Thanks.





Processing graphs

2015-02-17 Thread Vijayasarathy Kannan
Hi,

I am working on a Spark application that processes graphs and I am trying
to do the following.

- group the vertices (key - vertex, value - set of its outgoing edges)
- distribute each key to separate processes and process them (like mapper)
- reduce the results back at the main process

Does the groupBy functionality do the distribution by default?
Do we have to explicitly use RDDs to enable automatic distribution?

It'd be great if you could help me understand these and how to go about
with the problem.

Thanks.