Re: PySpark script works itself, but fails when called from other script

2013-11-18 Thread Andrei
I've tried adding task.py to pyFiles during SparkContext creation and it
worked perfectly. Thanks for your help!

If you need some more information for further investigation, here's what
I've noticed. Without explicitly adding file to SparkContext, only
functions that are defined in main module run by PySpark can be passed to
distributed jobs. E.g. if I define myfunc() in runner.py (and run
runner.py), it works pretty well. But if I define myfunc() in task.py (and
still run runner.py), it fails as I've described above. I've posted stderr
from failed executor here http://pastebin.com/NHNW3sTY, but essentially
it just says that Python worker crashed without any reference to the cause.
For the sake of completeness, here's also console
outputhttp://pastebin.com/Lkvdfhhz
.

To make it clear: all these errors occur only in my initial setup, adding
task.py to SparkContext fixes it anyway. Hope this helps.

Thanks,
Andrei





On Sat, Nov 16, 2013 at 2:12 PM, Andrei faithlessfri...@gmail.com wrote:

 Hi,

 thanks for your replies. I'm out of office now, so I will check it out on
 Monday morning, but guess about serialization/deserialization looks
 plausible.

 Thanks,
 Andrei


 On Sat, Nov 16, 2013 at 11:11 AM, Jey Kottalam j...@cs.berkeley.eduwrote:

 Hi Andrei,

 Could you please post the stderr logfile from the failed executor? You
 can find this in the work subdirectory of the worker that had the failed
 task. You'll need the executor id to find the corresonding stderr file.

 Thanks,
 -Jey


 On Friday, November 15, 2013, Andrei wrote:

 I have 2 Python modules/scripts - task.py and runner.py. First one
 (task.py) is a little Spark job and works perfectly well by itself.
 However, when called from runner.py with exactly the same arguments, it
 fails with only useless message (both - in terminal and worker logs).

 org.apache.spark.SparkException: Python worker exited unexpectedly
 (crashed)

 Below there's code for both - task.py and runner.py:

 task.py
 ---

 #!/usr/bin/env pyspark
 from __future__ import print_function
 from pyspark import SparkContext

 def process(line):
 return line.strip()

 def main(spark_master, path):
 sc = SparkContext(spark_master, 'My Job')
 rdd = sc.textFile(path)
 rdd = rdd.map(process) # this line causes troubles when called
 from runner.py
 count = rdd.count()
 print(count)

 if __name__ == '__main__':
 main('spark://spark-master-host:7077',
 'hdfs://hdfs-namenode-host:8020/path/to/file.log')


 runner.py
 -

 #!/usr/bin/env pyspark

 import task

 if __name__ == '__main__':
 task.main('spark://spark-master-host:7077',
'hdfs://hdfs-namenode-host:8020/path/to/file.log')


 ---

 So, what's the difference between calling PySpark-enabled script
 directly and as Python module? What are good rules for writing multi-module
 Python programs with Spark?

 Thanks,
 Andrei





How to efficiently manage resources across a cluster and avoid GC overhead exceeded errors?

2013-11-18 Thread ioannis.deligiannis
Hi,

I have a cluster of 20 servers, each having 24 cores and 30GB of RAM allocated 
to Spark. Spark runs in a STANDALONE mode.
I am trying to load some 200+GB files and cache the rows using .cache().

What I would like to do is the following: (ATM from the scala console)
-Evenly load the files across the 20 servers (preferably using all 20*24 cores 
for the load)
-Verify that data are loaded as NODE_LOCAL
Looking into the :4040 console, I see in some runs a lot of NODE_LOCAL but in 
others a lot of ANY. Is there a way to identify what is that TID doing in ANY

If I allocate less than ~double the memory I need, I get an OutOfMemory error.

If I use the textFile (int) parameter,

* i.e. sc.textFile(hdfs://...,20)
Then the error goes away.

On the other hand, if I allocate enough memory, I can see from the admin 
console that some of my workers have too much load and some other less than 
half. I understand that I could use a partitioner to balance my data but I 
wouldn't expect an OOME if nodes are significantly under-used. Am I missing 
something?

Thanks,

Ioannis Deligiannis


___

This message is for information purposes only, it is not a recommendation, 
advice, offer or solicitation to buy or sell a product or service nor an 
official confirmation of any transaction. It is directed at persons who are 
professionals and is not intended for retail customer use. Intended for 
recipient only. This message is subject to the terms at: 
www.barclays.com/emaildisclaimer.

For important disclosures, please see: 
www.barclays.com/salesandtradingdisclaimer regarding market commentary from 
Barclays Sales and/or Trading, who are active market participants; and in 
respect of Barclays Research, including disclosures relating to specific 
issuers, please see http://publicresearch.barclays.com.

___


Re: App master failed to find application jar in the master branch on YARN

2013-11-18 Thread Tom Graves
Hey Jiacheng Guo,

do you have SPARK_EXAMPLES_JAR env variable set?  If you do, you have to add 
the --addJars parameter to the yarn client and point to the spark examples jar. 
 Or just unset SPARK_EXAMPLES_JAR env variable.

You should only have to set SPARK_JAR env variable.  

If that isn't the issue let me know the build command you used and hadoop 
version, and your defaultFs or hadoop.

Tom



On Saturday, November 16, 2013 2:32 AM, guojc guoj...@gmail.com wrote:
 
hi,
   After reading about the exiting progress in consolidating shuffle, I'm eager 
to trying out the last master branch. However up to launch the example 
application, the job failed with prompt the app master failed to find the 
target jar. appDiagnostics: Application application_1384588058297_0017 failed 1 
times due to AM Container for appattempt_1384588058297_0017_01 exited with  
exitCode: -1000 due to: java.io.FileNotFoundException: File 
file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
 does not exist.

  Is there any change on how to launch a yarn job now?

Best Regards,
Jiacheng Guo

TeraSort on Spark

2013-11-18 Thread Rivera, Dario
Hello spark community.
I wanted to ask if any work has been done on porting TeraSort (Tera 
Gen/Sort/Validate) from Hadoop to Spark on EC2/EMR
I am looking for some guidance on lessons learned from this or similar efforts 
as we are trying to do some benchmarking on some of the newer EC2 instances to 
determine how to optimize in-memory processing of these instances with Spark 
for some of AWS' customers looking to move to Spark for their data processing 
workloads.

Any guidance the community can provide on this effort is greatly appreciated!

Thanks,

Dario Rivera
Solutions Architect
Cell: 571-205-2731
Email: dar...@amazon.commailto:dar...@amazon.com

[AWS Graphic]

inline: image003.jpg

Re: foreachPartition in Java

2013-11-18 Thread Yadid Ayzenberg

Great, I will use mapPartitions instead.
Thanks for the advice,

Yadid


On 11/17/13 8:13 PM, Aaron Davidson wrote:
Also, in general, you can workaround shortcomings in the Java API by 
converting to a Scala RDD (using JavaRDD's rdd() method). The API 
tends to be much clunkier since you have to jump through some hoops to 
talk to a Scala API in Java, though. In this case, JavaRDD's 
mapPartition() method will likely be the cleanest solution as Patrick 
said.



On Sun, Nov 17, 2013 at 5:03 PM, Patrick Wendell pwend...@gmail.com 
mailto:pwend...@gmail.com wrote:


Can you just call mapPartitions and ignore the result?

- Patrick

On Sun, Nov 17, 2013 at 4:45 PM, Yadid Ayzenberg
ya...@media.mit.edu mailto:ya...@media.mit.edu wrote:
 Hi,

 According to the API, foreachPartition() is not yet implemented
in Java.
 Are there any workarounds to get the same functionality ?
 I have a non serializable DB connection and instantiating it is
pretty
 expensive, so I prefer to do it on a per partition basis.

 thanks,
 Yadid







Fast Data Processing with Spark

2013-11-18 Thread R. Revert
Hello

Does any one read the Fast Data Processing with Spark  book (
http://www.amazon.com/Fast-Processing-Spark-Holden-Karau/dp/1782167064/ref=sr_1_1?ie=UTF8qid=1384791032sr=8-1keywords=fast+spark+data+processing
)

any review or opinions about the material?
 because im thinking to buy the book so i can learn more about spark but i
didnt see any comment's

Thanks



*#__*
*Atte.*
*Rafael R.*


Re: App master failed to find application jar in the master branch on YARN

2013-11-18 Thread guojc
Hi Tom,
   I'm on Hadoop 2.05.  I can launch application spark 0.8 release
normally. However I switch to git master branch version with application
built with it, I got the jar not found exception and same happens to the
example application. I have tried both file:// protocol and hdfs://
protocol with jar in local file system and hdfs respectively, and even
tried jar list parameter when new spark context.  The exception is slightly
different for hdfs protocol and local file path. My application launch
command is

 
SPARK_JAR=/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
/home/work/guojiacheng/spark/spark-class
 org.apache.spark.deploy.yarn.Client --jar
/home/work/guojiacheng/spark-auc/target/scala-2.9.3/SparkAUC-assembly-0.1.jar
--class  myClass.SparkAUC --args -c --args yarn-standalone  --args -i
--args hdfs://{hdfs_host}:9000/user/work/guojiacheng/data --args -m --args
hdfs://{hdfs_host}:9000/user/work/guojiacheng/model_large --args -o --args
hdfs://{hdfs_host}:9000/user/work/guojiacheng/score --num-workers 60
 --master-memory 6g --worker-memory 7g --worker-cores 1

And my build command is SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true
sbt/sbt assembly

Only thing I can think of might be related is on each cluster node, it has
a env SPARK_HOME point to a copy of 0.8 version's position, and its bin
fold is in Path environment variable. And 0.9 version is not there.  It was
something left over, when cluster was setup.  But I don't know whether it
is related, as my understand is the yarn version try to distribute spark
through yarn.

hdfs version error message:

 appDiagnostics: Application application_1384588058297_0056 failed
1 times due to AM Container for appattempt_1384588058297_0056_01 exited
with  exitCode: -1000 due to: RemoteTrace:
java.io.FileNotFoundException: File
file:/home/work/.sparkStaging/application_1384588058297_0056/SparkAUC-assembly-0.1.jar
does not exist

local version error message.
appDiagnostics: Application application_1384588058297_0066 failed 1 times
due to AM Container for appattempt_1384588058297_0066_01 exited with
 exitCode: -1000 due to: java.io.FileNotFoundException: File
file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
does not exist

Best Regards,
Jiacheng GUo



On Mon, Nov 18, 2013 at 10:34 PM, Tom Graves tgraves...@yahoo.com wrote:

 Hey Jiacheng Guo,

 do you have SPARK_EXAMPLES_JAR env variable set?  If you do, you have to
 add the --addJars parameter to the yarn client and point to the spark
 examples jar.  Or just unset SPARK_EXAMPLES_JAR env variable.

 You should only have to set SPARK_JAR env variable.

 If that isn't the issue let me know the build command you used and hadoop
 version, and your defaultFs or hadoop.

 Tom


   On Saturday, November 16, 2013 2:32 AM, guojc guoj...@gmail.com wrote:
  hi,
After reading about the exiting progress in consolidating shuffle, I'm
 eager to trying out the last master branch. However up to launch the
 example application, the job failed with prompt the app master failed to
 find the target jar. appDiagnostics: Application
 application_1384588058297_0017 failed 1 times due to AM Container for
 appattempt_1384588058297_0017_01 exited with  exitCode: -1000 due to:
 java.io.FileNotFoundException: File
 file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
 does not exist.

   Is there any change on how to launch a yarn job now?

 Best Regards,
 Jiacheng Guo






Re: code review - splitting columns

2013-11-18 Thread Tom Vacek
This is in response to your question about something in the API that
already does this.  You might want to keep your eye on MLI (
http://www.mlbase.org), which is columnar table written for machine
learning but applicable to a lot of problems.  It's not perfect right now.


On Fri, Nov 15, 2013 at 7:56 PM, Aaron Davidson ilike...@gmail.com wrote:

 Regarding only your last point, you could always split backwards to avoid
 having to worry about updated indices (i.e., split the highest index column
 first). But if you're additionally worried about efficiency, a combined
 approach could make more sense to avoid making two full passes on the data.

 Otherwise, I don't see anything particularly amiss here, but I'm no expert.


 On Wed, Nov 13, 2013 at 3:00 PM, Philip Ogren philip.og...@oracle.comwrote:

 Hi Spark community,

 I learned a lot the last time I posted some elementary Spark code here.
  So, I thought I would do it again.  Someone politely tell me offline if
 this is noise or unfair use of the list!  I acknowledge that this borders
 on asking Scala 101 questions

 I have an RDD[List[String]] corresponding to columns of data and I want
 to split one of the columns using some arbitrary function and return an RDD
 updated with the new columns.  Here is the code I came up with.

 def splitColumn(columnsRDD: RDD[List[String]], columnIndex: Int,
 numSplits: Int, splitFx: String = List[String]): RDD[List[String]] = {

 def insertColumns(columns: List[String]) : List[String] = {
   val split = columns.splitAt(columnIndex)
   val left = split._1
   val splitColumn = split._2.head
   val splitColumns = splitFx(splitColumn).padTo(numSplits,
 ).take(numSplits)
   val right = split._2.tail
   left ++ splitColumns ++ right
 }

 columnsRDD.map(columns = insertColumns(columns))
   }

 Here is a simple test that demonstrates the behavior:

   val spark = new SparkContext(local, test spark)
   val testStrings = List(List(1.2, a b), List(3.4, c d e),
 List(5.6, f))
   var testRDD: RDD[List[String]] = spark.parallelize(testStrings)
   testRDD = splitColumn(testRDD, 0, 2, _.split(\\.).toList)
   testRDD = splitColumn(testRDD, 2, 2, _.split( ).toList) //Line 5
   val actualStrings = testRDD.collect.toList
   assertEquals(4, actualStrings(0).length)
   assertEquals(1, 2, a, b, actualStrings(0).mkString(, ))
   assertEquals(4, actualStrings(1).length)
   assertEquals(3, 4, c, d, actualStrings(1).mkString(, ))
   assertEquals(4, actualStrings(2).length)
   assertEquals(5, 6, f, , actualStrings(2).mkString(, ))


 My first concern about this code is that I'm missing out on something
 that does exactly this in the API.  This seems like such a common use case
 that I would not be surprised if there's a readily available way to do this.

 I'm a little uncertain about the typing of splitColumn - i.e. the first
 parameter and the return value.  It seems like a general solution wouldn't
 require every column to be a String value.  I'm also annoyed that line 5 in
 the test code requires that I use an updated index to split what was
 originally the second column.  This suggests that perhaps I should split
 all the columns that need splitting in one function call - but it seems
 like doing that would require an unwieldy function signature.

 Any advice or insight is appreciated!

 Thanks,
 Philip





Re: Spark Avro in Scala

2013-11-18 Thread Matt Massie
Agree with Eugen that you should used Kryo.

But even better is to embed your Avro objects inside of Kryo. This allows
you to have the benefits of both Avro and Kryo.

Here's example code for using Avro with Kryo.

https://github.com/massie/adam/blob/master/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/AdamKryoRegistrator.scala

You need to register all of you Avro SpecificClasses with Kryo and have it
use the AvroSerializer class to encode/decode them.

e.g.

class AdamKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MySpecificAvroClass], new
AvroSerializer[MySpecificAvroClass]()
  }
}










--
Matt Massie
UC, Berkeley AMPLab
Twitter: @matt_massie https://twitter.com/matt_massie,
@amplabhttps://twitter.com/amplab
https://amplab.cs.berkeley.edu/


On Mon, Nov 18, 2013 at 10:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi Robert,

 The problem is that spark uses java serialization requiring serialized
 objects to implement Serializable, AvroKey doesn't.
 As a workaround you can try using 
 kryohttp://spark.incubator.apache.org/docs/latest/tuning.html#data-serializationfor
  the serialization.

 Eugen


 2013/11/11 Robert Fink ursula2...@gmail.com

 Hi,

 I am trying to get the following minimal Scala example work: Using Spark
 to process Avro records. Here's my dummy Avro definition:

 {
   namespace: com.avrotest,
   type: record,
   name: AvroTest,
   fields: [
 {name: field1, type: [string, null]}
   ]
 }

 I experiment with a simple job that creates three AvroTest objects,
 writes them out to a file through a SparkContext, and then reads in the
 thus generated Avro file and performs a simple grouping operation:

 //
 -
 import org.apache.spark.SparkContext._
 import org.apache.avro.specific.SpecificDatumWriter
 import org.apache.avro.file.DataFileWriter
 import org.apache.avro._
 import org.apache.avro.generic._
 import org.apache.hadoop.mapreduce.Job
 import com.avrotest.AvroTest
 import java.io.File

 object SparkTest{
   def main(args: Array[String]) {

 def avrofile = output.avro
 def sc = new SparkContext(local, Simple App)
 val job = new Job()

 val record1 = new AvroTest()
 record1.setField1(value1)
 val record2 = new AvroTest()
 record2.setField1(value1)
 val record3 = new AvroTest()
 record3.setField1(value2)

 def userDatumWriter = new SpecificDatumWriter[AvroTest]()
 val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter)
  def file = new File(avrofile)
 dataFileWriter.create(record1.getSchema(), file)
 dataFileWriter.append(record1)
 dataFileWriter.append(record2)
 dataFileWriter.append(record3)
 dataFileWriter.close()

 def rdd = sc.newAPIHadoopFile(
   avrofile,
   classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]],
   classOf[org.apache.avro.mapred.AvroKey[AvroTest]],
   classOf[org.apache.hadoop.io.NullWritable],
   job.getConfiguration)
 // rdd.foreach( x = println(x._1.datum.getField1) ) // Prints
 value1, value1, value2
 val numGroups= rdd.groupBy(x = x._1.datum.getField1).count()
   }
 }
 //
 -

 I would expect numGroups==2 in the last step, because record1 and record2
 share the getField1()==value1, and record3 has getField1() == value2.
 However, the script fails to execute with the following error (see below).
 Can anyone give me a hint what could be wrong in the above code, or post an
 example of reading from an Avro file and performing some simple
 computations on the retrieved objects? Thank you so much! Robert.

 11650 [pool-109-thread-1] WARN
 org.apache.avro.mapreduce.AvroKeyInputFormat - Reader schema was not set.
 Use AvroJob.setInputKeySchema() if desired.
 11661 [pool-109-thread-1] INFO
 org.apache.avro.mapreduce.AvroKeyInputFormat - Using a reader schema equal
 to the writer schema.
 12293 [spark-akka.actor.default-dispatcher-5] INFO
 org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to
 java.io.NotSerializableException
 java.io.NotSerializableException: org.apache.avro.mapred.AvroKey
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 

debugging a Spark error

2013-11-18 Thread Chris Grier
Hi,

I'm trying to figure out what the problem is with a job that we are running
on Spark 0.7.3. When we write out via saveAsTextFile we get an exception
that doesn't reveal much:

13/11/18 15:06:19 INFO cluster.TaskSetManager: Loss was due to
java.io.IOException
java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:849)
at spark.storage.DiskStore.getBytes(DiskStore.scala:86)
at spark.storage.DiskStore.getValues(DiskStore.scala:92)
at spark.storage.BlockManager.getLocal(BlockManager.scala:284)
at spark.storage.BlockFetcherIterator$$anonfun$
13.apply(BlockManager.scala:1027)
at spark.storage.BlockFetcherIterator$$anonfun$
13.apply(BlockManager.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(
ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(
ArrayBuffer.scala:47)
at spark.storage.BlockFetcherIterator.init(
BlockManager.scala:1026)
at spark.storage.BlockManager.getMultiple(BlockManager.scala:478)
at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
scala:51)
at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
scala:10)
at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
CoGroupedRDD.scala:127)
at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
CoGroupedRDD.scala:115)
at scala.collection.IndexedSeqOptimized$class.
foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:704)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:714)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.scheduler.ResultTask.run(ResultTask.scala:77)
at spark.executor.Executor$TaskRunner.run(Executor.scala:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

Any ideas?

-Chris


Re: DataFrame RDDs

2013-11-18 Thread andy petrella
Maybe I'm wrong, but this use case could be a good fit for
Shapelesshttps://github.com/milessabin/shapeless'
records.

Shapeless' records are like, so to say, lisp's record but typed! In that
sense, they're more closer to Haskell's record notation, but imho less
powerful, since the access will be based on String (field name) for
Shapeless where Haskell will use pure functions!

Anyway, this 
documentationhttps://github.com/milessabin/shapeless/wiki/Feature-overview%3a-shapeless-2.0.0#extensible-records
is
self-explanatory and straightforward how we (maybe) could use them to
simulate an R's frame

Thinking out loud: when reading a csv file, for instance, what would be
needed are
 * a Read[T] for each column,
 * fold'ling the list of columns by reading each and prepending the
result (combined with the name with -) to an HList

The gain would be that we should recover one helpful feature of R's frame
which is:
  R   :: frame$newCol = frame$post - frame$pre
  // which adds a column to a frame
  Shpls :: frame2 = frame + (newCol -- (frame(post) - frame(pre)))
  // type safe difference between ints for instance

Of course, we're not recovering R's frame as is, because we're simply
dealing with rows on by one, where a frame is dealing with the full table
-- but in the case of Spark this would have no sense to mimic that, since
we use RDDs for that :-D.

I didn't experimented this yet, but It'd be fun to try, don't know if
someone is interested in ^^

Cheers

andy


On Fri, Nov 15, 2013 at 8:49 PM, Christopher Nguyen c...@adatao.com wrote:

 Sure, Shay. Let's connect offline.

 Sent while mobile. Pls excuse typos etc.
 On Nov 16, 2013 2:27 AM, Shay Seng s...@1618labs.com wrote:

 Nice, any possibility of sharing this code in advance?


 On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen c...@adatao.comwrote:

 Shay, we've done this at Adatao, specifically a big data frame in RDD
 representation and subsetting/projections/data mining/machine learning
 algorithms on that in-memory table structure.

 We're planning to harmonize that with the MLBase work in the near
 future. Just a matter of prioritization on limited resources. If there's
 enough interest we'll accelerate that.

 Sent while mobile. Pls excuse typos etc.
 On Nov 16, 2013 1:11 AM, Shay Seng s...@1618labs.com wrote:

 Hi,

 Is there some way to get R-style Data.Frame data structures into RDDs?
 I've been using RDD[Seq[]] but this is getting quite error-prone and the
 code gets pretty hard to read especially after a few joins, maps etc.

 Rather than access columns by index, I would prefer to access them by
 name.
 e.g. instead of writing:
 myrdd.map(l = Seq(l(0), l(1), l,(4), l(9))
 I would prefer to write
 myrdd.map(l = DataFrame(l.id, l.entryTime, l.exitTime, l.cost))

 Also joins are particularly irritating. Currently I have to first
 construct a pair:
 somePairRdd.join(myrdd.map(l= (l(1),l(2)), (l(0),l(1),l(2),l(3)))
 Now I have to unzip away the join-key and remap the values into a seq

 instead I would rather write
 someDataFrame.join(myrdd , l= l.entryTime  l.exitTime)


 The question is this:
 (1) I started writing a DataFrameRDD class that kept track of the
 column names and column values, and some optional attributes common to the
 entire dataframe. However I got a little muddled when trying to figure out
 what happens when a dataframRDD is chained with other operations and get
 transformed to other types of RDDs. The Value part of the RDD is obvious,
 but I didn't know the best way to pass on the column and attribute
 portions of the DataFrame class.

 I googled around for some documentation on how to write RDDs, but only
 found a pptx slide presentation with very vague info. Is there a better
 source of info on how to write RDDs?

 (2) Even better than info on how to write RDDs, has anyone written an
 RDD that functions as a DataFrame? :-)

 tks
 shay





EC2 node submit jobs to separate Spark Cluster

2013-11-18 Thread Matt Cheah
Hi,

I'm working with an infrastructure that already has its own web server set up 
on EC2. I would like to set up a separate spark cluster on EC2 with the scripts 
and have the web server submit jobs to this spark cluster.

Is it possible to do this? I'm getting some errors running the spark shell from 
the spark shell on the web server: Initial job has not accepted any resources; 
check your cluster UI to ensure that workers are registered and have sufficient 
memory. I have heard that it's not possible for any local computer to connect 
to the spark cluster, but I was wondering if other EC2 nodes could have their 
firewalls configured to allow this.

We don't want to deploy the web server on the master node of the spark cluster.

Thanks,

-Matt Cheah




Can not get the expected output when running the BroadcastTest example program.

2013-11-18 Thread 杨强
Hi, all.
I'm using spark-0.8.0-incubating.

I tried the example BroadcastTest in local mode.
./run-example org.apache.spark.examples.BroadcastTest local 1 2/dev/null 
This works fine and get the result:
Iteration 0
===
100
100
100
100
100
100
100
100
100
100
Iteration 1
===
100
100
100
100
100
100
100
100
100
100

But when I run this program in the cluster(standalone mode) with:
./run-example org.apache.spark.examples.BroadcastTest spark://172.16.1.39:7077 
5 2/dev/null 
This output is as follows:
Iteration 0
===
Iteration 1
===

I also tried command
./run-example org.apache.spark.examples.BroadcastTest spark://172.16.1.39:7077 5
but I did not find any error message.

Hope someone can give me some advices. Thank you.


The content of file etc/spark-env.sh is as follows:

export SCALA_HOME=/usr/lib/scala-2.9.3
export SPARK_MASTER_IP=172.16.1.39
export SPARK_MASTER_WEBUI_PORT=8090
export SPARK_WORKER_WEBUI_PORT=8091
export SPARK_WORKER_MEMORY=2G
#export 
SPARK_CLASSPATH=.:/home/spark-0.7.3/core/target/spark-core-assembly-0.7.3.jar:$SPACK_CLASSPATH
export 
SPARK_CLASSPATH=.:/home/hadoop/spark-0.8.0-incubating/conf:/home/hadoop/spark-0.8.0-incubating/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.0.1.jar:/home/hadoop/hadoop-1.0.1/conf




Sincerely

Yang, Qiang

Re: debugging a Spark error

2013-11-18 Thread Aaron Davidson
Have you looked a the Spark executor logs? They're usually located in the
$SPARK_HOME/work/ directory. If you're running in a cluster, they'll be on
the individual slave nodes. These should hopefully reveal more information.


On Mon, Nov 18, 2013 at 3:42 PM, Chris Grier gr...@icsi.berkeley.eduwrote:

 Hi,

 I'm trying to figure out what the problem is with a job that we are
 running on Spark 0.7.3. When we write out via saveAsTextFile we get an
 exception that doesn't reveal much:

 13/11/18 15:06:19 INFO cluster.TaskSetManager: Loss was due to
 java.io.IOException
 java.io.IOException: Map failed
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:849)
 at spark.storage.DiskStore.getBytes(DiskStore.scala:86)
 at spark.storage.DiskStore.getValues(DiskStore.scala:92)
 at spark.storage.BlockManager.getLocal(BlockManager.scala:284)
 at spark.storage.BlockFetcherIterator$$anonfun$
 13.apply(BlockManager.scala:1027)
 at spark.storage.BlockFetcherIterator$$anonfun$
 13.apply(BlockManager.scala:1026)
 at scala.collection.mutable.ResizableArray$class.foreach(
 ResizableArray.scala:60)
 at scala.collection.mutable.ArrayBuffer.foreach(
 ArrayBuffer.scala:47)
 at spark.storage.BlockFetcherIterator.init(
 BlockManager.scala:1026)
 at spark.storage.BlockManager.getMultiple(BlockManager.scala:478)
 at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
 scala:51)
 at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
 scala:10)
 at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
 CoGroupedRDD.scala:127)
 at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
 CoGroupedRDD.scala:115)
 at scala.collection.IndexedSeqOptimized$class.
 foreach(IndexedSeqOptimized.scala:34)
 at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
 at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:704)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:714)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.scheduler.ResultTask.run(ResultTask.scala:77)
 at spark.executor.Executor$TaskRunner.run(Executor.scala:100)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)

 Any ideas?

 -Chris



Re: EC2 node submit jobs to separate Spark Cluster

2013-11-18 Thread Aaron Davidson
The main issue with running a spark-shell locally is that it orchestrates
the actual computation, so you want it to be close to the actual Worker
nodes for latency reasons. Running a spark-shell on EC2 in the same region
as the Spark cluster avoids this problem.

The error you're seeing seems to indicate a different issue. Check the
Master web UI (accessible on port 8080 at the master's IP address) to make
sure that Workers are successfully registered and they have the expected
amount of memory available to Spark. You can also check to see how much
memory your spark-shell is trying to get per executor. A couple common
problems are (1) an abandoned spark-shell is holding onto all of your
cluster's resources or (2) you've manually configured your spark-shell to
try to get more memory than your Workers have available. Both of these
should be visible in the web UI.


On Mon, Nov 18, 2013 at 5:00 PM, Matt Cheah mch...@palantir.com wrote:

  Hi,

  I'm working with an infrastructure that already has its own web server
 set up on EC2. I would like to set up a *separate* spark cluster on EC2
 with the scripts and have the web server submit jobs to this spark cluster.

  Is it possible to do this? I'm getting some errors running the spark
 shell from the spark shell on the web server: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory. I have heard that it's not possible for any
 local computer to connect to the spark cluster, but I was wondering if
 other EC2 nodes could have their firewalls configured to allow this.

  We don't want to deploy the web server on the master node of the spark
 cluster.

  Thanks,

  -Matt Cheah





Re: Joining files

2013-11-18 Thread Something Something
Was my question so dumb?  Or, is this not a good use case for Spark?


On Sun, Nov 17, 2013 at 11:41 PM, Something Something 
mailinglist...@gmail.com wrote:

 I am a newbie to both Spark  Scala, but I've been working with Hadoop/Pig
 for quite some time.

 We've quite a few ETL processes running in production that use Pig, but
 now we're evaluating Spark to see if they would indeed run faster.

 A very common use case in our Pig script is joining a file containing
 Facts to a file containing Dimension data.  The joins are of course, inner,
 left  outer.

 I thought I would start simple.  Let's say I've 2 files:

 1)  Students:  student_id, course_id, score
 2)  Course:  course_id, course_title

 We want to produce a file that contains:  student_id, course_title, score

 (Note:  This is a hypothetical case.  The real files have millions of
 facts  thousands of dimensions)

 Would something like this work?  Note:  I did say I am a newbie ;)

 val students = sc.textFile(./students.txt)
 val courses = sc.textFile(./courses.txt)
 val s = students.map(x = x.split(','))
 val left = students.map(x = x.split(',')).map(y = (y(1), y))
 val right = courses.map(x = x.split(',')).map(y = (y(0), y))
 val joined = left.join(right)


 Any pointers in this regard would be greatly appreciated.  Thanks.



Re: DataFrame RDDs

2013-11-18 Thread Matei Zaharia
Interesting idea — in Scala you can also use the Dynamic type 
(http://hacking-scala.org/post/49051516694/introduction-to-type-dynamic) to 
allow dynamic properties. It has the same potential pitfalls as string names, 
but with nicer syntax.

Matei

On Nov 18, 2013, at 3:45 PM, andy petrella andy.petre...@gmail.com wrote:

 Maybe I'm wrong, but this use case could be a good fit for Shapeless' records.
 
 Shapeless' records are like, so to say, lisp's record but typed! In that 
 sense, they're more closer to Haskell's record notation, but imho less 
 powerful, since the access will be based on String (field name) for Shapeless 
 where Haskell will use pure functions!
 
 Anyway, this documentation is self-explanatory and straightforward how we 
 (maybe) could use them to simulate an R's frame
 
 Thinking out loud: when reading a csv file, for instance, what would be 
 needed are 
  * a Read[T] for each column, 
  * fold'ling the list of columns by reading each and prepending the result 
 (combined with the name with -) to an HList
 
 The gain would be that we should recover one helpful feature of R's frame 
 which is:
   R   :: frame$newCol = frame$post - frame$pre
// which adds a column to a frame
   Shpls :: frame2 = frame + (newCol -- (frame(post) - frame(pre))) 
 // type safe difference between ints for instance

 Of course, we're not recovering R's frame as is, because we're simply dealing 
 with rows on by one, where a frame is dealing with the full table -- but in 
 the case of Spark this would have no sense to mimic that, since we use RDDs 
 for that :-D.
 
 I didn't experimented this yet, but It'd be fun to try, don't know if someone 
 is interested in ^^
 
 Cheers
 
 andy
 
 
 On Fri, Nov 15, 2013 at 8:49 PM, Christopher Nguyen c...@adatao.com wrote:
 Sure, Shay. Let's connect offline.
 
 Sent while mobile. Pls excuse typos etc.
 
 On Nov 16, 2013 2:27 AM, Shay Seng s...@1618labs.com wrote:
 Nice, any possibility of sharing this code in advance? 
 
 
 On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen c...@adatao.com wrote:
 Shay, we've done this at Adatao, specifically a big data frame in RDD 
 representation and subsetting/projections/data mining/machine learning 
 algorithms on that in-memory table structure.
 
 We're planning to harmonize that with the MLBase work in the near future. 
 Just a matter of prioritization on limited resources. If there's enough 
 interest we'll accelerate that.
 
 Sent while mobile. Pls excuse typos etc.
 
 On Nov 16, 2013 1:11 AM, Shay Seng s...@1618labs.com wrote:
 Hi, 
 
 Is there some way to get R-style Data.Frame data structures into RDDs? I've 
 been using RDD[Seq[]] but this is getting quite error-prone and the code gets 
 pretty hard to read especially after a few joins, maps etc. 
 
 Rather than access columns by index, I would prefer to access them by name.
 e.g. instead of writing:
 myrdd.map(l = Seq(l(0), l(1), l,(4), l(9))
 I would prefer to write
 myrdd.map(l = DataFrame(l.id, l.entryTime, l.exitTime, l.cost))
 
 Also joins are particularly irritating. Currently I have to first construct a 
 pair:
 somePairRdd.join(myrdd.map(l= (l(1),l(2)), (l(0),l(1),l(2),l(3)))
 Now I have to unzip away the join-key and remap the values into a seq
 
 instead I would rather write
 someDataFrame.join(myrdd , l= l.entryTime  l.exitTime)
 
 
 The question is this:
 (1) I started writing a DataFrameRDD class that kept track of the column 
 names and column values, and some optional attributes common to the entire 
 dataframe. However I got a little muddled when trying to figure out what 
 happens when a dataframRDD is chained with other operations and get 
 transformed to other types of RDDs. The Value part of the RDD is obvious, but 
 I didn't know the best way to pass on the column and attribute portions of 
 the DataFrame class.
 
 I googled around for some documentation on how to write RDDs, but only found 
 a pptx slide presentation with very vague info. Is there a better source of 
 info on how to write RDDs? 
 
 (2) Even better than info on how to write RDDs, has anyone written an RDD 
 that functions as a DataFrame? :-)
 
 tks
 shay