Re: Reading a large file (binary) into RDD

2015-04-02 Thread Jeremy Freeman
Hm, that will indeed be trickier because this method assumes records are the 
same byte size. Is the file an arbitrary sequence of mixed types, or is there 
structure, e.g. short, long, short, long, etc.? 

If you could post a gist with an example of the kind of file and how it should 
look once read in that would be useful!

-
jeremyfreeman.net
@thefreemanlab

On Apr 2, 2015, at 2:09 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 Thanks for the reply. Unfortunately, in my case, the binary file is a mix of 
 short and long integers. Is there any other way that could of use here?
 
 My current method happens to have a large overhead (much more than actual 
 computation time). Also, I am short of memory at the driver when it has to 
 read the entire file.
 
 On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com 
 wrote:
 If it’s a flat binary file and each record is the same length (in bytes), you 
 can use Spark’s binaryRecords method (defined on the SparkContext), which 
 loads records from one or more large flat binary files into an RDD. Here’s an 
 example in python to show how it works:
 
 # write data from an array
 from numpy import random
 dat = random.randn(100,5)
 f = open('test.bin', 'w')
 f.write(dat)
 f.close()
 
 # load the data back in
 from numpy import frombuffer
 nrecords = 5
 bytesize = 8
 recordsize = nrecords * bytesize
 data = sc.binaryRecords('test.bin', recordsize)
 parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float'))
 
 # these should be equal
 parsed.first()
 dat[0,:]
 
 
 Does that help?
 
 -
 jeremyfreeman.net
 @thefreemanlab
 
 On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:
 
 What are some efficient ways to read a large file into RDDs?
 
 For example, have several executors read a specific/unique portion of the 
 file and construct RDDs. Is this possible to do in Spark?
 
 Currently, I am doing a line-by-line read of the file at the driver and 
 constructing the RDD.
 
 



Re: Reading a large file (binary) into RDD

2015-04-02 Thread Jeremy Freeman
If it’s a flat binary file and each record is the same length (in bytes), you 
can use Spark’s binaryRecords method (defined on the SparkContext), which loads 
records from one or more large flat binary files into an RDD. Here’s an example 
in python to show how it works:

 # write data from an array
 from numpy import random
 dat = random.randn(100,5)
 f = open('test.bin', 'w')
 f.write(dat)
 f.close()

 # load the data back in
 from numpy import frombuffer
 nrecords = 5
 bytesize = 8
 recordsize = nrecords * bytesize
 data = sc.binaryRecords('test.bin', recordsize)
 parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float'))

 # these should be equal
 parsed.first()
 dat[0,:]


Does that help?

-
jeremyfreeman.net
@thefreemanlab

 On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:
 
 What are some efficient ways to read a large file into RDDs?
 
 For example, have several executors read a specific/unique portion of the 
 file and construct RDDs. Is this possible to do in Spark?
 
 Currently, I am doing a line-by-line read of the file at the driver and 
 constructing the RDD.



Re: Can LBFGS be used on streaming data?

2015-03-19 Thread Jeremy Freeman
Regarding the first question, can you say more about how you are loading your 
data? And what is the size of the data set? And is that the only error you see, 
and do you only see it in the streaming version?

For the second question, there are a couple reasons the weights might slightly 
differ, it depends on exactly how you set up the comparison. When you split it 
into 5, were those the same 5 chunks of data you used for the streaming case? 
And were they presented to the optimizer in the same order? Difference in 
either could produce small differences in the resulting weights, but that 
doesn’t mean it’s doing anything wrong.

-
jeremyfreeman.net
@thefreemanlab

On Mar 17, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote:

 Hello Jeremy,
 
 Thank you for your reply.
 
 When I am running this code on the local machine on a streaming data, it 
 keeps giving me this error:
 WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost): 
 java.io.FileNotFoundException: 
 /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file or 
 directory) 
 
 And when I execute the same code on a static data after randomly splitting it 
 into 5 sets, it gives me a little bit different weights (difference is in 
 decimals). I am still trying to analyse why would this be happening.
 Any inputs, on why would this be happening?
 
 Best Regards,
 Arunkumar
 
 
 On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman freeman.jer...@gmail.com 
 wrote:
 Hi Arunkumar,
 
 That looks like it should work. Logically, it’s similar to the implementation 
 used by StreamingLinearRegression and StreamingLogisticRegression, see this 
 class:
 
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
 
 which exposes the kind of operation your describing (for any linear method).
 
 The nice thing about the gradient-based methods is that they can use existing 
 MLLib optimization routines in this fairly direct way. Other methods (such as 
 KMeans) require a bit more reengineering.
 
 — Jeremy
 
 -
 jeremyfreeman.net
 @thefreemanlab
 
 On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote:
 
 Hello,
 
 I am new to spark streaming API.
 
 I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on 
 streaming data? Currently I am using forecahRDD for parsing through DStream 
 and I am generating a model based on each RDD. Am I doing anything logically 
 wrong here?
 Thank you.
 
 Sample Code:
 val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
 var initialWeights = 
 Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
 var isFirst = true
 var model = new LinearRegressionModel(null,1.0)
 
 parsedData.foreachRDD{rdd =
   if(isFirst) {
 val weights = algorithm.optimize(rdd, initialWeights)
 val w = weights.toArray
 val intercept = w.head
 model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
 isFirst = false
   }else{
 var ab = ArrayBuffer[Double]()
 ab.insert(0, model.intercept)
 ab.appendAll( model.weights.toArray)
 print(Intercept = +model.intercept+ :: modelWeights = +model.weights)
 initialWeights = Vectors.dense(ab.toArray)
 print(Initial Weights: + initialWeights)
 val weights = algorithm.optimize(rdd, initialWeights)
 val w = weights.toArray
 val intercept = w.head
 model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
   }
 
 
 Best Regards,
 Arunkumar
 
 



Re: Can LBFGS be used on streaming data?

2015-03-17 Thread Jeremy Freeman
Hi Arunkumar,

That looks like it should work. Logically, it’s similar to the implementation 
used by StreamingLinearRegression and StreamingLogisticRegression, see this 
class:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala

which exposes the kind of operation your describing (for any linear method).

The nice thing about the gradient-based methods is that they can use existing 
MLLib optimization routines in this fairly direct way. Other methods (such as 
KMeans) require a bit more reengineering.

— Jeremy

-
jeremyfreeman.net
@thefreemanlab

On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote:

 Hello,
 
 I am new to spark streaming API.
 
 I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming 
 data? Currently I am using forecahRDD for parsing through DStream and I am 
 generating a model based on each RDD. Am I doing anything logically wrong 
 here?
 Thank you.
 
 Sample Code:
 val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
 var initialWeights = 
 Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
 var isFirst = true
 var model = new LinearRegressionModel(null,1.0)
 
 parsedData.foreachRDD{rdd =
   if(isFirst) {
 val weights = algorithm.optimize(rdd, initialWeights)
 val w = weights.toArray
 val intercept = w.head
 model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
 isFirst = false
   }else{
 var ab = ArrayBuffer[Double]()
 ab.insert(0, model.intercept)
 ab.appendAll( model.weights.toArray)
 print(Intercept = +model.intercept+ :: modelWeights = +model.weights)
 initialWeights = Vectors.dense(ab.toArray)
 print(Initial Weights: + initialWeights)
 val weights = algorithm.optimize(rdd, initialWeights)
 val w = weights.toArray
 val intercept = w.head
 model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
   }
 
 
 Best Regards,
 Arunkumar



Re: Streaming linear regression example question

2015-03-15 Thread Jeremy Freeman
Hi Margus, thanks for reporting this, I’ve been able to reproduce and there 
does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can 
hopefully include in 1.3.1.

In the meantime, you can get the desired result using transform:

 model.trainOn(trainingData)
 
 testingData.transform { rdd =
   val latest = model.latestModel()
   rdd.map(lp = (lp.label, latest.predict(lp.features)))
 }.print()

-
jeremyfreeman.net
@thefreemanlab

On Mar 15, 2015, at 2:56 PM, Margus Roo mar...@roo.ee wrote:

 Hi again
 
 Tried the same 
 examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
  from 1.3.0
 and getting in case testing file content is:
   (0.0,[3.0,4.0,3.0])
   (0.0,[4.0,4.0,4.0])
   (4.0,[5.0,5.0,5.0])
   (5.0,[5.0,6.0,6.0])
   (6.0,[7.0,4.0,7.0])
   (7.0,[8.0,6.0,8.0])
   (8.0,[44.0,9.0,9.0])
   (9.0,[14.0,30.0,10.0])
 
 and the answer:
 (0.0,0.0)
 (0.0,0.0)
 (0.0,0.0)
 (4.0,0.0)
 (5.0,0.0)
 (6.0,0.0)
 (7.0,0.0)
 (8.0,0.0)
 (9.0,0.0)
 
 What is wrong?
 I can see that model's weights are changing in case I put new data into 
 training dir.
 Margus (margusja) Roo
 http://margus.roo.ee
 skype: margusja
 +372 51 480
 On 14/03/15 09:05, Margus Roo wrote:
 Hi
 
 I try to understand example provided in 
 https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming 
 linear regression
 
 Code:
 import org.apache.spark._
 import org.apache.spark.streaming._
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.DStream
 
 object StreamingLinReg {
 
   def main(args: Array[String]) {
   
 val conf = new 
 SparkConf().setAppName(StreamLinReg).setMaster(local[2])
 val ssc = new StreamingContext(conf, Seconds(10))
   
 
 val trainingData = 
 ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/training/).map(LabeledPoint.parse).cache()
 
 val testData = 
 ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/testing/).map(LabeledPoint.parse)
 
 val numFeatures = 3
 val model = new 
 StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
 
 model.trainOn(trainingData)
 model.predictOnValues(testData.map(lp = (lp.label, 
 lp.features))).print()
 
 ssc.start()
 ssc.awaitTermination()
 
   }
 
 }
 
 Compiled code and run it
 Put file contains
   (1.0,[2.0,2.0,2.0])
   (2.0,[3.0,3.0,3.0])
   (3.0,[4.0,4.0,4.0])
   (4.0,[5.0,5.0,5.0])
   (5.0,[6.0,6.0,6.0])
   (6.0,[7.0,7.0,7.0])
   (7.0,[8.0,8.0,8.0])
   (8.0,[9.0,9.0,9.0])
   (9.0,[10.0,10.0,10.0])
 in to training directory.
 
 I can see that models weight change:
 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: 
 weights, [7.333,7.333,7.333]
 
 No I can put what ever in to testing directory but I can not understand 
 answer.
 In example I can put the same file I used for training in to testing 
 directory. File content is
   (1.0,[2.0,2.0,2.0])
   (2.0,[3.0,3.0,3.0])
   (3.0,[4.0,4.0,4.0])
   (4.0,[5.0,5.0,5.0])
   (5.0,[6.0,6.0,6.0])
   (6.0,[7.0,7.0,7.0])
   (7.0,[8.0,8.0,8.0])
   (8.0,[9.0,9.0,9.0])
   (9.0,[10.0,10.0,10.0])
 
 And answer will be
 (1.0,0.0)
 (2.0,0.0)
 (3.0,0.0)
 (4.0,0.0)
 (5.0,0.0)
 (6.0,0.0)
 (7.0,0.0)
 (8.0,0.0)
 (9.0,0.0)
 
 And in case my file content is
   (0.0,[2.0,2.0,2.0])
   (0.0,[3.0,3.0,3.0])
   (0.0,[4.0,4.0,4.0])
   (0.0,[5.0,5.0,5.0])
   (0.0,[6.0,6.0,6.0])
   (0.0,[7.0,7.0,7.0])
   (0.0,[8.0,8.0,8.0])
   (0.0,[9.0,9.0,9.0])
   (0.0,[10.0,10.0,10.0])
 
 the answer will be:
 (0.0,0.0)
 (0.0,0.0)
 (0.0,0.0)
 (0.0,0.0)
 (0.0,0.0)
 (0.0,0.0)
 (0.0,0.0)
 (0.0,0.0)
 (0.0,0.0)
 
 I except to get label predicted by model.
 -- 
 Margus (margusja) Roo
 http://margus.roo.ee
 skype: margusja
 +372 51 480
 



Re: MLlib(Logistic Regression) + Spark Streaming.

2014-12-28 Thread Jeremy Freeman
Along with Xiangrui’s suggestion, we will soon be adding an implantation of 
Streaming Logistic Regression, which will be similar to the current version of 
Streaming Linear Regression, and continually update the model as new data 
arrive (JIRA). Hopefully this will be in v1.3.

— Jeremy

-
jeremyfreeman.net
@thefreemanlab

On Dec 15, 2014, at 2:50 PM, Xiangrui Meng men...@gmail.com wrote:

 If you want to train offline and predict online, you can use the
 current LR implementation to train a model and then apply
 model.predict on the dstream. -Xiangrui
 
 On Sun, Dec 7, 2014 at 6:30 PM, Nasir Khan nasirkhan.onl...@gmail.com wrote:
 I am new to spark.
 Lets say i want to develop a machine learning model. which trained on normal
 method in MLlib. I want to use that model with classifier Logistic
 regression and predict the streaming data coming from a file or socket.
 
 
 Streaming data - Logistic Regression - binary label prediction.
 
 Is it possible? since there is no streaming logistic regression algo like
 streaming linear regression.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Logistic-Regression-Spark-Streaming-tp20564.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 



Re: MLlib + Streaming

2014-12-28 Thread Jeremy Freeman
Hi Fernando,

There’s currently no streaming ALS in Spark. I’m exploring a streaming singular 
value decomposition (JIRA) based on this paper 
(http://www.stat.osu.edu/~dmsl/thinSVDtracking.pdf), which might be one way to 
think about it.

There has also been some cool recent work explicitly on streaming ALS w/ SGD 
that we should look into 
(https://www.cs.utexas.edu/~cjohnson/ParallelCollabFilt.pdf).

— Jeremy

-
jeremyfreeman.net
@thefreemanlab

On Dec 23, 2014, at 2:47 PM, Fernando O. fot...@gmail.com wrote:

 Hey Xiangrui,
   
 Is there any plan to have a streaming compatible ALS version?
 
 Or if it's currently doable, is there any example?
 
 
 
 On Tue, Dec 23, 2014 at 4:31 PM, Xiangrui Meng men...@gmail.com wrote:
 We have streaming linear regression (since v1.1) and k-means (v1.2) in
 MLlib. You can check the user guide:
 
 http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression
 http://spark.apache.org/docs/latest/mllib-clustering.html#streaming-clustering
 
 -Xiangrui
 
 On Tue, Dec 23, 2014 at 10:01 AM, Gianmarco De Francisci Morales
 g...@apache.org wrote:
  Hi,
 
  I have recently seen a demo of Spark where different pieces were put
  together (training via MLlib + deploying on Spark Streaming).
  I was wondering if MLlib currently works to directly train on Streaming.
  And, if so, what are the semantics of the algorithms?
  If not, would it be interesting to have ML algorithms developed for the
  streaming setting?
 
  Thanks,
  --
  Gianmarco
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 



meetup october 30-31st in SF

2014-10-08 Thread Jeremy Freeman
Hi all,

We’re organizing a meetup October 30-31st in downtown SF that might be of 
interest to the Spark community. The focus is on large-scale data analysis and 
its role in neuroscience. It will feature several active Spark developers and 
users, including Xiangrui Meng, Josh Rosen, Reza Zadeh, and Sandy Ryza. 

You can sign up here (registration is free):

https://www.eventbrite.com/e/codeneuro-tickets-13197330571?discount=spark

Hope some of you can make it!

— Jeremy

--
jeremyfreeman.net
@thefreemanlab



Re: Anaconda Spark AMI

2014-07-13 Thread Jeremy Freeman
Hi Ben,

This is great! I just spun up an EC2 cluster and tested basic pyspark  + 
ipython/numpy/scipy functionality, and all seems to be working so far. Will let 
you know if any issues arise.

We do a lot with pyspark + scientific computing, and for EC2 usage I think this 
is a terrific way to get the core libraries installed.

-- Jeremy

On Jul 12, 2014, at 4:25 PM, Benjamin Zaitlen quasi...@gmail.com wrote:

 Hi All,
 
 Thanks to Jey's help, I have a release AMI candidate for 
 spark-1.0/anaconda-2.0 integration.  It's currently limited to availability 
 in US-EAST: ami-3ecd0c56
 
 Give it a try if you have some time.  This should just work with spark 1.0:
 
 ./spark-ec2 -k my_key -i ~/.ssh/mykey.rsa  -a ami-3ecd0c56
 
 If you have suggestions or run into trouble please email,
 
 --Ben
 
 PS:  I found that writing a noop map function is a decent way to install pkgs 
 on worker nodes (though most scientific pkgs are pre-installed with anaconda:
 
 def subprocess_noop(x):
 import os
 os.system(/opt/anaconda/bin/conda install h5py) 
 return 1
 
 install_noop = rdd.map(subprocess_noop)
 install_noop.count()
 
 
 On Thu, Jul 3, 2014 at 2:32 PM, Jey Kottalam j...@cs.berkeley.edu wrote:
 Hi Ben,
 
 Has the PYSPARK_PYTHON environment variable been set in
 spark/conf/spark-env.sh to the path of the new python binary?
 
 FYI, there's a /root/copy-dirs script that can be handy when updating
 files on an already-running cluster. You'll want to restart the spark
 cluster for the changes to take effect, as described at
 https://spark.apache.org/docs/latest/ec2-scripts.html
 
 Hope that helps,
 -Jey
 
 On Thu, Jul 3, 2014 at 11:54 AM, Benjamin Zaitlen quasi...@gmail.com wrote:
  Hi All,
 
  I'm a dev a Continuum and we are developing a fair amount of tooling around
  Spark.  A few days ago someone expressed interest in numpy+pyspark and
  Anaconda came up as a reasonable solution.
 
  I spent a number of hours yesterday trying to rework the base Spark AMI on
  EC2 but sadly was defeated by a number of errors.
 
  Aggregations seemed to choke -- where as small takes executed as aspected
  (errors are linked to the gist):
 
  sc.appName
  u'PySparkShell'
  sc._conf.getAll()
  [(u'spark.executor.extraLibraryPath', u'/root/ephemeral-hdfs/lib/native/'),
  (u'spark.executor.memory', u'6154m'), (u'spark.submit.pyFiles', u''),
  (u'spark.app.name', u'
  PySparkShell'), (u'spark.executor.extraClassPath',
  u'/root/ephemeral-hdfs/conf'), (u'spark.master',
  u'spark://.compute-1.amazonaws.com:7077')]
  file = sc.textFile(hdfs:///user/root/chekhov.txt)
  file.take(2)
  [uProject Gutenberg's Plays by Chekhov, Second Series, by Anton Chekhov,
  u'']
 
  lines = file.filter(lambda x: len(x)  0)
  lines.count()
  VARIOUS ERROS DISCUSSED BELOW
 
  My first thought was that I could simply get away with including anaconda on
  the base AMI, point the path at /dir/anaconda/bin, and bake a new one.
  Doing so resulted in some strange py4j errors like the following:
 
  Py4JError: An error occurred while calling o17.partitions. Trace:
  py4j.Py4JException: Method partitions([]) does not exist
 
  At some point I also saw:
  SystemError: Objects/cellobject.c:24: bad argument to internal function
 
  which is really strange, possibly the result of a version mismatch?
 
  I had another thought of building spark from master on the AMI, leaving the
  spark directory in place, and removing the spark call from the modules list
  in spark-ec2 launch script. Unfortunately, this resulted in the following
  errors:
 
  https://gist.github.com/quasiben/da0f4778fbc87d02c088
 
  If a spark dev was willing to make some time in the near future, I'm sure
  she/he and I could sort out these issues and give the Spark community a
  python distro ready to go for numerical computing.  For instance, I'm not
  sure how pyspark calls out to launching a python session on a slave?  Is
  this done as root or as the hadoop user? (i believe i changed /etc/bashrc to
  point to my anaconda bin directory so it shouldn't really matter.  Is there
  something special about the py4j zip include in spark dir compared with the
  py4j in pypi?
 
  Thoughts?
 
  --Ben
 
 
 



Re: error loading large files in PySpark 0.9.0

2014-06-06 Thread Jeremy Freeman
Oh cool, thanks for the heads up! Especially for the Hadoop InputFormat
support. We recently wrote a custom hadoop input format so we can support
flat binary files
(https://github.com/freeman-lab/thunder/tree/master/scala/src/main/scala/thunder/util/io/hadoop),
and have been testing it in Scala. So I was following Nick's progress and
was eager to check this out when ready. Will let you guys know how it goes.

-- J



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049p7144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: error loading large files in PySpark 0.9.0

2014-06-04 Thread Jeremy Freeman
Hey Matei,

Wanted to let you know this issue appears to be fixed in 1.0.0. Great work!

-- Jeremy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049p6985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark on an HPC setup

2014-05-28 Thread Jeremy Freeman
Hi Sid,

We are successfully running Spark on an HPC, it works great. Here's info on our 
setup / approach.

We have a cluster with 256 nodes running Scientific Linux 6.3 and scheduled by 
Univa Grid Engine.  The environment also has a DDN GridScalar running GPFS and 
several EMC Isilon clusters serving NFS to the compute cluster.

We wrote a custom qsub job to spin up Spark dynamically on a user-designated 
quantity of nodes. The UGE scheduler first designates a set of nodes that will 
be used to run Spark. Once the nodes are available, we use start-master.sh 
script to launch a master, and send it the addresses of the other nodes. The 
master then starts the workers with start-all.sh. At that point, the Spark 
cluster is usable and remains active until the user issues a qdel, which 
triggers the stop-all.sh on the master, and takes down the cluster. 

This worked well for us because users can pick the number of nodes to suit 
their job, and multiple users can run their own Spark clusters on the same 
system (alongside other non-Spark jobs).

We don't use HDFS for the filesystem, instead relying on NFS and GPFS, and the 
cluster is not running Hadoop. In tests, we've seen similar performance between 
our set up, and using Spark w/ HDFS on EC2 with higher-end instances (matched 
roughly for memory and number of cores).

Unfortunately we can't open source the launched scripts because they contain 
proprietary UGE stuff, but happy to try and answer any follow-up questions.

-- Jeremy

-
Jeremy Freeman, PhD
Neuroscientist
@thefreemanlab

On May 28, 2014, at 11:02 AM, Sidharth Kashyap sidharth.n.kash...@outlook.com 
wrote:

 Hi,
 
 Has anyone tried to get Spark working on an HPC setup?
 If yes, can you please share your learnings and how you went about doing it?
 
 An HPC setup typically comes bundled with dynamically allocated cluster and a 
 very efficient scheduler.
 
 Configuring Spark standalone in this mode of operation is challenging as the 
 Hadoop dependencies need to be eliminated and the cluster needs to be 
 configured on the fly.
 
 Thanks,
 Sid
 
 
 



Re: Computing cosine similiarity using pyspark

2014-05-27 Thread Jeremy Freeman
Hi Jamal,

One nice feature of PySpark is that you can easily use existing functions
from NumPy and SciPy inside your Spark code. For a simple example, the
following uses Spark's cartesian operation (which combines pairs of vectors
into tuples), followed by NumPy's corrcoef to compute the pearson
correlation coefficient between every pair of a set of vectors. The vectors
are an RDD of numpy arrays.

 from numpy import array, corrcoef

 data = sc.parallelize([array([1,2,3]),array([2,4,6.1]),array([3,2,1.1])])
 corrs = data.cartesian(data).map(lambda (x,y):
corrcoef(x,y)[0,1]).collect()
 corrs
[1.0, 0.0086740991746, -0.99953863896044948 ...

This just returns a list of the correlation coefficients, you could also
add a key to each array, to keep track of which pair is which

 data_with_keys =
sc.parallelize([(0,array([1,2,3])),(1,array([2,4,6.1])),(2,array([3,2,1.1]))])
 corrs_with_keys = data_with_keys.cartesian(data_with_keys).map(lambda
((k1,v1),(k2,v2)): ((k1,k2),corrcoef(v1,v2)[0,1])).collect()
 corrs_with_keys
[((0, 0), 1.0), ((0, 1), 0.0086740991746), ((0, 2),
-0.99953863896044948) ...

Finally, you could just replace corrcoef in either of the above
with scipy.spatial.distance.cosine to get your cosine similarity.

Hope that's useful, as Andrei said, the answer partly depends on exactly
what you're trying to do.

-- Jeremy


On Fri, May 23, 2014 at 2:41 PM, Andrei faithlessfri...@gmail.com wrote:

 Do you need cosine distance and correlation between vectors or between
 variables (elements of vector)? It would be helpful if you could tell us
 details of your task.


 On Thu, May 22, 2014 at 5:49 PM, jamal sasha jamalsha...@gmail.comwrote:

 Hi,
   I have bunch of vectors like
 [0.1234,-0.231,0.23131]
  and so on.

 and  I want to compute cosine similarity and pearson correlation using
 pyspark..
 How do I do this?
 Any ideas?
 Thanks





spark ec2 error

2014-05-04 Thread Jeremy Freeman
Hi all,

A heads up in case others hit this and are confused…  This nice  addition
https://github.com/apache/spark/pull/612   causes an error if running the
spark-ec2.py deploy script from a version other than master (e.g. 0.8.0).

The error occurs during launch, here:

...
Creating local config files...
configuring /etc/ganglia/gmond.conf
Traceback (most recent call last):
  File ./deploy_templates.py, line 89, in module
text = text.replace({{ + key + }}, template_vars[key])
TypeError: expected a character buffer object
Deploying Spark config files...
chmod: cannot access `/root/spark/conf/spark-env.sh': No such file or
directory
...

And then several more errors because of missing variables (though the
cluster itself launches, there are several configuration problems, e.g. with
HDFS). 

deploy_templates fails because the new SPARK_MASTER_OPTS and
SPARK_WORKER_INSTANCES don't exist, and earlier versions of spark-ec2.py
still use deploy_templates from https://github.com/mesos/spark-ec2.git -b
v2, which has the new variables.

Using the updated spark-ec2.py from master works fine.

-- Jeremy

-
Jeremy Freeman, PhD
Neuroscientist
@thefreemanlab



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-error-tp5323.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark ec2 error

2014-05-04 Thread Jeremy Freeman
Cool, glad to help! I just tested with 0.8.1 and 0.9.0 and both worked
perfectly, so seems to all be good.

-- Jeremy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-error-tp5323p5329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Initial job has not accepted any resources

2014-05-04 Thread Jeremy Freeman
Hey Pedro,

From which version of Spark were you running the spark-ec2.py script? You
might have run into the problem described here
(http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-error-td5323.html),
which Patrick just fixed up to ensure backwards compatibility.

With the bug, it would successfully complete deployment but prevent the
correct setting of various variables, so may have caused the errors you were
seeing, though I'm not positive.

I'd definitely try re-running the spark-ec2 script now.

-- Jeremy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Initial-job-has-not-accepted-any-resources-tp5322p5335.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Combining RDD's columns

2014-04-18 Thread Jeremy Freeman
Hi Ian,

If I understand what you're after, you might find zip useful. From the docs:

Zips this RDD with another one, returning key-value pairs with the first 
element in each RDD, second element in each RDD, etc. Assumes that the two RDDs 
have the *same number of partitions* and the *same number of elements in each 
partition* (e.g. one was made through a map on the other).

Here's a toy example:

 val rdd1 = sc.parallelize(Array(name1, name2, name3), 3)
 val rdd2 = sc.parallelize(Array(sign1, sign2, sign3), 3)
 rdd1.collect()
Array[String] = Array(name1, name2, name3)
 rdd2.collect()
Array[String] = Array(sign1, sign2, sign3)
 rdd1.zip(rdd2).collect()
Array[(String, String)] = Array((name1,sign1), (name2,sign2), (name3,sign3))

In your case, you might have the first two RDDs calculated from some common raw 
data through a map.

-- Jeremy

-
Jeremy Freeman, PhD
Neuroscientist
@thefreemanlab

On Apr 19, 2014, at 12:59 AM, Ian Ferreira ianferre...@hotmail.com wrote:

 
 This may seem contrived but, suppose I wanted to create a collection of  
 single column RDD's that contain calculated values, so I want to cache 
 these to avoid re-calc.
 
 i.e.
 
 rdd1 = {Names]
 rdd2 = {Star Sign}
 rdd3 = {Age}
 
 Then I want to create a new virtual RDD that  is a collection of these RDD's 
 to create a multi-column RDD
 
 rddA = {Names, Age}
 rddB = {Names, Star Sign}
 
 I saw that rdd.union() merges rows, but anything that can combine columns?
 
 Cheers
 - Ian



Re: Scala vs Python performance differences

2014-04-14 Thread Jeremy Freeman
Hi Andrew,

I'm putting together some benchmarks for PySpark vs Scala. I'm focusing on
ML algorithms, as I'm particularly curious about the relative performance of
MLlib in Scala vs the Python MLlib API vs pure Python implementations. 

Will share real results as soon as I have them, but roughly, in our hands,
that 40% number is ballpark correct, at least for some basic operations (e.g
textFile, count, reduce).

-- Jeremy

-
Jeremy Freeman, PhD
Neuroscientist
@thefreemanlab



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Scala-vs-Python-performance-differences-tp4247p4261.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark on other parallel filesystems

2014-04-04 Thread Jeremy Freeman
We run Spark (in Standalone mode) on top of a network-mounted file system 
(NFS), rather than HDFS, and find it to work great. It required no modification 
or special configuration to set this up; as Matei says, we just point Spark to 
data using the file location.

-- Jeremy

On Apr 4, 2014, at 8:12 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

 As long as the filesystem is mounted at the same path on every node, you 
 should be able to just run Spark and use a file:// URL for your files.
 
 The only downside with running it this way is that Lustre won’t expose data 
 locality info to Spark, the way HDFS does. That may not matter if it’s a 
 network-mounted file system though.
 
 Matei
 
 On Apr 4, 2014, at 4:56 PM, Venkat Krishnamurthy ven...@yarcdata.com wrote:
 
 All
 
 Are there any drawbacks or technical challenges (or any information, really) 
 related to using Spark directly on a global parallel filesystem  like 
 Lustre/GPFS? 
 
 Any idea of what would be involved in doing a minimal proof of concept? Is 
 it just possible to run Spark unmodified (without the HDFS substrate) for a 
 start, or will that not work at all? I do know that it’s possible to 
 implement Tachyon on Lustre and get the HDFS interface – just looking at 
 other options.
 
 Venkat
 



Re: Calling Spark enthusiasts in NYC

2014-03-31 Thread Jeremy Freeman
Happy to help with an NYC meet up (just emailed Andy). I recently moved to VA, 
but am back in NYC quite often, and have been turning several computational 
people at Columbia / NYU / Simons Foundation onto Spark; there'd definitely be 
interest in those communities.

-- Jeremy

-
jeremy freeman, phd
neuroscientist
@thefreemanlab

On Mar 31, 2014, at 2:31 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote:

 Nicholas, I'm in Boston and would be interested in a Spark group. Not
 sure if you know this -- there was a meetup that never got off the
 ground. Anyway, I'd be +1 for attending. Not sure what is involved in
 organizing. Seems a shame that a city like Boston doesn't have one.
 
 On Mon, Mar 31, 2014 at 2:02 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
 As in, I am interested in helping organize a Spark meetup in the Boston
 area.
 
 
 On Mon, Mar 31, 2014 at 2:00 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
 
 Well, since this thread has played out as it has, lemme throw in a
 shout-out for Boston.
 
 
 On Mon, Mar 31, 2014 at 1:52 PM, Chris Gore cdg...@cdgore.com wrote:
 
 We'd love to see a Spark user group in Los Angeles and connect with
 others working with it here.
 
 Ping me if you're in the LA area and use Spark at your company (
 ch...@retentionscience.com ).
 
 Chris
 
 Retention Science
 call: 734.272.3099
 visit: Site | like: Facebook | follow: Twitter
 
 On Mar 31, 2014, at 10:42 AM, Anurag Dodeja anu...@anuragdodeja.com
 wrote:
 
 How about Chicago?
 
 
 On Mon, Mar 31, 2014 at 12:38 PM, Nan Zhu zhunanmcg...@gmail.com wrote:
 
 Montreal or Toronto?
 
 
 On Mon, Mar 31, 2014 at 1:36 PM, Martin Goodson mar...@skimlinks.com
 wrote:
 
 How about London?
 
 
 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 image.png
 
 
 On Mon, Mar 31, 2014 at 6:28 PM, Andy Konwinski
 andykonwin...@gmail.com wrote:
 
 Hi folks,
 
 We have seen a lot of community growth outside of the Bay Area and we
 are looking to help spur even more!
 
 For starters, the organizers of the Spark meetups here in the Bay Area
 want to help anybody that is interested in setting up a meetup in a new
 city.
 
 Some amazing Spark champions have stepped forward in Seattle,
 Vancouver, Boulder/Denver, and a few other areas already.
 
 Right now, we are looking to connect with you Spark enthusiasts in NYC
 about helping to run an inaugural Spark Meetup in your area.
 
 You can reply to me directly if you are interested and I can tell you
 about all of the resources we have to offer (speakers from the core
 community, a budget for food, help scheduling, etc.), and let's make 
 this
 happen!
 
 Andy
 
 
 
 
 
 
 



Re: error loading large files in PySpark 0.9.0

2014-03-24 Thread Jeremy Freeman
Thanks Matei, unfortunately doesn't seem to fix it. I tried batchSize = 10, 
100, as well as 1 (which should reproduce the 0.8.1 behavior?), and it stalls 
at the same point in each case.

-- Jeremy

-
jeremy freeman, phd
neuroscientist
@thefreemanlab

On Mar 23, 2014, at 9:56 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

 Hey Jeremy, what happens if you pass batchSize=10 as an argument to your 
 SparkContext? It tries to serialize that many objects together at a time, 
 which might be too much. By default the batchSize is 1024.
 
 Matei
 
 On Mar 23, 2014, at 10:11 AM, Jeremy Freeman freeman.jer...@gmail.com wrote:
 
 Hi all,
 
 Hitting a mysterious error loading large text files, specific to PySpark
 0.9.0.
 
 In PySpark 0.8.1, this works:
 
 data = sc.textFile(path/to/myfile)
 data.count()
 
 But in 0.9.0, it stalls. There are indications of completion up to:
 
 14/03/17 16:54:24 INFO TaskSetManager: Finished TID 4 in 1699 ms on X.X.X.X
 (progress: 15/537)
 14/03/17 16:54:24 INFO DAGScheduler: Completed ResultTask(5, 4)
 
 And then this repeats indefinitely
 
 14/03/17 16:54:24 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5,
 runningTasks: 144
 14/03/17 16:54:25 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5,
 runningTasks: 144
 
 Always stalls at the same place. There's nothing in stderr on the workers,
 but in stdout there are several of these messages:
 
 INFO PythonRDD: stdin writer to Python finished early
 
 So perhaps the real error is being suppressed as in
 https://spark-project.atlassian.net/browse/SPARK-1025
 
 Data is just rows of space-separated numbers, ~20GB, with 300k rows and 50k
 characters per row. Running on a private cluster with 10 nodes, 100GB / 16
 cores each, Python v 2.7.6.
 
 I doubt the data is corrupted as it works fine in Scala in 0.8.1 and 0.9.0,
 and in PySpark in 0.8.1. Happy to post the file, but it should repro for
 anything with these dimensions. It *might* be specific to long strings: I
 don't see it with fewer characters (10k) per row, but I also don't see it
 with many fewer rows but the same number of characters per row.
 
 Happy to try and provide more info / help debug!
 
 -- Jeremy
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 



error loading large files in PySpark 0.9.0

2014-03-23 Thread Jeremy Freeman
Hi all,

Hitting a mysterious error loading large text files, specific to PySpark
0.9.0.

In PySpark 0.8.1, this works:

data = sc.textFile(path/to/myfile)
data.count()

But in 0.9.0, it stalls. There are indications of completion up to:

14/03/17 16:54:24 INFO TaskSetManager: Finished TID 4 in 1699 ms on X.X.X.X
(progress: 15/537)
14/03/17 16:54:24 INFO DAGScheduler: Completed ResultTask(5, 4)

And then this repeats indefinitely

14/03/17 16:54:24 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5,
runningTasks: 144
14/03/17 16:54:25 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5,
runningTasks: 144

Always stalls at the same place. There's nothing in stderr on the workers,
but in stdout there are several of these messages:

INFO PythonRDD: stdin writer to Python finished early

So perhaps the real error is being suppressed as in
https://spark-project.atlassian.net/browse/SPARK-1025

Data is just rows of space-separated numbers, ~20GB, with 300k rows and 50k
characters per row. Running on a private cluster with 10 nodes, 100GB / 16
cores each, Python v 2.7.6.

I doubt the data is corrupted as it works fine in Scala in 0.8.1 and 0.9.0,
and in PySpark in 0.8.1. Happy to post the file, but it should repro for
anything with these dimensions. It *might* be specific to long strings: I
don't see it with fewer characters (10k) per row, but I also don't see it
with many fewer rows but the same number of characters per row.

Happy to try and provide more info / help debug!

-- Jeremy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Machine Learning on streaming data

2014-03-20 Thread Jeremy Freeman
Thanks TD, happy to share my experience with MLLib + Spark Streaming 
integration.

Here's a gist with two examples I have working, one for 
StreamingLinearRegression and another for StreamingKMeans.

https://gist.github.com/freeman-lab/9672685

The goal in each case was to implement a streaming version of the algorithm, 
using as much as possible directly from MLLib. For Linear Regression this was 
straightforward, because the MLLib version already uses a (stochastic) update 
rule, which I just use to update the model inside a foreachRDD(), using each 
new batch of data. For KMeans, I used the model class from MLLib, but extended 
it to keep a running count for each cluster. I also had to re-implement a chunk 
of the core algorithm in the form of an update rule. Tighter integration in 
this case would, I think, require refactoring some of MLLib (e.g. to use 
something like this update function), but this works fine.

One unresolved issue: for these kinds of algorithms, the dimensionality of the 
data must be known in advance. Would be cool to automatically detect it based 
on the first record.

-- Jeremy

On Mar 19, 2014, at 9:03 PM, Tathagata Das tathagata.das1...@gmail.com wrote:

 Yes, of course you can conceptually apply machine learning algorithm on Spark 
 Streaming. However the current MLLib does not yet have direct support for 
 Spark Streaming's DStream. However, since DStreams are essentially a sequence 
 of RDDs, you can apply MLLib algorithms on those RDDs. Take a look at 
 DStream.transform() and DStream.foreachRDD() operations, which allows you 
 access RDDs in a DStream. You can apply MLLib functions on them.
 
 Some people have attempted to make a tighter integration between MLLib and 
 Spark Streaming. Jeremy (cc'ed) can say more about his adventures. 
 
 TD
 
 
 On Sun, Mar 16, 2014 at 5:56 PM, Nasir Khan nasirkhan.onl...@gmail.com 
 wrote:
 hi, I m into a project in which i have to get streaming URL's and Filter it
 and classify it as benin or suspicious. Now Machine Learning and Streaming
 are two separate things in apache spark (AFAIK). my Question is Can we apply
 Online Machine Learning Algorithms on Streams??
 
 I am at Beginner Level, Kindly Explain in abit detail and if some one can
 direct me to some good material for me will be greats.
 
 Thanks
 Nasir Khan.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Machine-Learning-on-streaming-data-tp2732.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 



Re: example of non-line oriented input data?

2014-03-19 Thread Jeremy Freeman
Another vote on this, support for simple SequenceFiles and/or Avro would be 
terrific, as using plain text can be very space-inefficient, especially for 
numerical data.

-- Jeremy

On Mar 19, 2014, at 5:24 PM, Nicholas Chammas nicholas.cham...@gmail.com 
wrote:

 I'd second the request for Avro support in Python first, followed by Parquet.
 
 
 On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin itparan...@gmail.com wrote:
 
 On 19 Mar 2014, at 19:54, Diana Carroll dcarr...@cloudera.com wrote:
 
 Actually, thinking more on this question, Matei: I'd definitely say support 
 for Avro.  There's a lot of interest in this!!
 
 
 Agree, and parquet as default Cloudera Impala format.
 
 
 
 
 On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 BTW one other thing — in your experience, Diana, which non-text InputFormats 
 would be most useful to support in Python first? Would it be Parquet or 
 Avro, simple SequenceFiles with the Hadoop Writable types, or something 
 else? I think a per-file text input format that does the stuff we did here 
 would also be good.
 
 Matei
 
 
 On Mar 18, 2014, at 3:27 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 
 Hi Diana,
 
 This seems to work without the iter() in front if you just return 
 treeiterator. What happened when you didn’t include that? Treeiterator 
 should return an iterator.
 
 Anyway, this is a good example of mapPartitions. It’s one where you want to 
 view the whole file as one object (one XML here), so you couldn’t implement 
 this using a flatMap, but you still want to return multiple values. The 
 MLlib example you saw needs Python 2.7 because unfortunately that is a 
 requirement for our Python MLlib support (see 
 http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
  We’d like to relax this later but we’re using some newer features of NumPy 
 and Python. The rest of PySpark works on 2.6.
 
 In terms of the size in memory, here both the string s and the XML tree 
 constructed from it need to fit in, so you can’t work on very large 
 individual XML files. You may be able to use a streaming XML parser instead 
 to extract elements from the data in a streaming fashion, without every 
 materializing the whole tree. 
 http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader
  is one example.
 
 Matei
 
 On Mar 18, 2014, at 7:49 AM, Diana Carroll dcarr...@cloudera.com wrote:
 
 Well, if anyone is still following this, I've gotten the following code 
 working which in theory should allow me to parse whole XML files: (the 
 problem was that I can't return the tree iterator directly.  I have to 
 call iter().  Why?)
 
 import xml.etree.ElementTree as ET
 
 # two source files, format data country 
 name=../country.../data
 mydata=sc.textFile(file:/home/training/countries*.xml) 
 
 def parsefile(iterator):
 s = ''
 for i in iterator: s = s + str(i)
 tree = ET.fromstring(s)
 treeiterator = tree.getiterator(country)
 # why to I have to convert an iterator to an iterator?  not sure but 
 required
 return iter(treeiterator)
 
 mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: 
 element.attrib).collect()
 
 The output is what I expect:
 [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
 
 BUT I'm a bit concerned about the construction of the string s.  How big 
 can my file be before converting it to a string becomes problematic?
 
 
 
 On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.com 
 wrote:
 Thanks, Matei.
 
 In the context of this discussion, it would seem mapParitions is 
 essential, because it's the only way I'm going to be able to process each 
 file as a whole, in our example of a large number of small XML files which 
 need to be parsed as a whole file because records are not required to be 
 on a single line.
 
 The theory makes sense but I'm still utterly lost as to how to implement 
 it.  Unfortunately there's only a single example of the use of 
 mapPartitions in any of the Python example programs, which is the log 
 regression example, which I can't run because it requires Python 2.7 and 
 I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6 
 is unsupported...is it?)
 
 I'd really really love to see a real life example of a Python use of 
 mapPartitions.  I do appreciate the very simple examples you provided, but 
 (perhaps because of my novice status on Python) I can't figure out how to 
 translate those to a real world situation in which I'm building RDDs from 
 files, not inline collections like [(1,2),(2,3)].
 
 Also, you say that the function called in mapPartitions can return a 
 collection OR an iterator.  I tried returning an iterator by calling 
 ElementTree getiterator function, but still got the error telling me my 
 object was not an iterator. 
 
 If anyone has a real life example of mapPartitions returning a Python 
 iterator, that would be fabulous.