Re: Reading a large file (binary) into RDD
Hm, that will indeed be trickier because this method assumes records are the same byte size. Is the file an arbitrary sequence of mixed types, or is there structure, e.g. short, long, short, long, etc.? If you could post a gist with an example of the kind of file and how it should look once read in that would be useful! - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 2:09 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: Thanks for the reply. Unfortunately, in my case, the binary file is a mix of short and long integers. Is there any other way that could of use here? My current method happens to have a large overhead (much more than actual computation time). Also, I am short of memory at the driver when it has to read the entire file. On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: If it’s a flat binary file and each record is the same length (in bytes), you can use Spark’s binaryRecords method (defined on the SparkContext), which loads records from one or more large flat binary files into an RDD. Here’s an example in python to show how it works: # write data from an array from numpy import random dat = random.randn(100,5) f = open('test.bin', 'w') f.write(dat) f.close() # load the data back in from numpy import frombuffer nrecords = 5 bytesize = 8 recordsize = nrecords * bytesize data = sc.binaryRecords('test.bin', recordsize) parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float')) # these should be equal parsed.first() dat[0,:] Does that help? - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: What are some efficient ways to read a large file into RDDs? For example, have several executors read a specific/unique portion of the file and construct RDDs. Is this possible to do in Spark? Currently, I am doing a line-by-line read of the file at the driver and constructing the RDD.
Re: Reading a large file (binary) into RDD
If it’s a flat binary file and each record is the same length (in bytes), you can use Spark’s binaryRecords method (defined on the SparkContext), which loads records from one or more large flat binary files into an RDD. Here’s an example in python to show how it works: # write data from an array from numpy import random dat = random.randn(100,5) f = open('test.bin', 'w') f.write(dat) f.close() # load the data back in from numpy import frombuffer nrecords = 5 bytesize = 8 recordsize = nrecords * bytesize data = sc.binaryRecords('test.bin', recordsize) parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float')) # these should be equal parsed.first() dat[0,:] Does that help? - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: What are some efficient ways to read a large file into RDDs? For example, have several executors read a specific/unique portion of the file and construct RDDs. Is this possible to do in Spark? Currently, I am doing a line-by-line read of the file at the driver and constructing the RDD.
Re: Can LBFGS be used on streaming data?
Regarding the first question, can you say more about how you are loading your data? And what is the size of the data set? And is that the only error you see, and do you only see it in the streaming version? For the second question, there are a couple reasons the weights might slightly differ, it depends on exactly how you set up the comparison. When you split it into 5, were those the same 5 chunks of data you used for the streaming case? And were they presented to the optimizer in the same order? Difference in either could produce small differences in the resulting weights, but that doesn’t mean it’s doing anything wrong. - jeremyfreeman.net @thefreemanlab On Mar 17, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello Jeremy, Thank you for your reply. When I am running this code on the local machine on a streaming data, it keeps giving me this error: WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost): java.io.FileNotFoundException: /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file or directory) And when I execute the same code on a static data after randomly splitting it into 5 sets, it gives me a little bit different weights (difference is in decimals). I am still trying to analyse why would this be happening. Any inputs, on why would this be happening? Best Regards, Arunkumar On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hi Arunkumar, That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala which exposes the kind of operation your describing (for any linear method). The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering. — Jeremy - jeremyfreeman.net @thefreemanlab On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Re: Can LBFGS be used on streaming data?
Hi Arunkumar, That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala which exposes the kind of operation your describing (for any linear method). The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering. — Jeremy - jeremyfreeman.net @thefreemanlab On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Re: Streaming linear regression example question
Hi Margus, thanks for reporting this, I’ve been able to reproduce and there does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can hopefully include in 1.3.1. In the meantime, you can get the desired result using transform: model.trainOn(trainingData) testingData.transform { rdd = val latest = model.latestModel() rdd.map(lp = (lp.label, latest.predict(lp.features))) }.print() - jeremyfreeman.net @thefreemanlab On Mar 15, 2015, at 2:56 PM, Margus Roo mar...@roo.ee wrote: Hi again Tried the same examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala from 1.3.0 and getting in case testing file content is: (0.0,[3.0,4.0,3.0]) (0.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[5.0,6.0,6.0]) (6.0,[7.0,4.0,7.0]) (7.0,[8.0,6.0,8.0]) (8.0,[44.0,9.0,9.0]) (9.0,[14.0,30.0,10.0]) and the answer: (0.0,0.0) (0.0,0.0) (0.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) What is wrong? I can see that model's weights are changing in case I put new data into training dir. Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480 On 14/03/15 09:05, Margus Roo wrote: Hi I try to understand example provided in https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming linear regression Code: import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream object StreamingLinReg { def main(args: Array[String]) { val conf = new SparkConf().setAppName(StreamLinReg).setMaster(local[2]) val ssc = new StreamingContext(conf, Seconds(10)) val trainingData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/training/).map(LabeledPoint.parse).cache() val testData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/testing/).map(LabeledPoint.parse) val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() } } Compiled code and run it Put file contains (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) in to training directory. I can see that models weight change: 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: weights, [7.333,7.333,7.333] No I can put what ever in to testing directory but I can not understand answer. In example I can put the same file I used for training in to testing directory. File content is (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) And answer will be (1.0,0.0) (2.0,0.0) (3.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) And in case my file content is (0.0,[2.0,2.0,2.0]) (0.0,[3.0,3.0,3.0]) (0.0,[4.0,4.0,4.0]) (0.0,[5.0,5.0,5.0]) (0.0,[6.0,6.0,6.0]) (0.0,[7.0,7.0,7.0]) (0.0,[8.0,8.0,8.0]) (0.0,[9.0,9.0,9.0]) (0.0,[10.0,10.0,10.0]) the answer will be: (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) I except to get label predicted by model. -- Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480
Re: MLlib(Logistic Regression) + Spark Streaming.
Along with Xiangrui’s suggestion, we will soon be adding an implantation of Streaming Logistic Regression, which will be similar to the current version of Streaming Linear Regression, and continually update the model as new data arrive (JIRA). Hopefully this will be in v1.3. — Jeremy - jeremyfreeman.net @thefreemanlab On Dec 15, 2014, at 2:50 PM, Xiangrui Meng men...@gmail.com wrote: If you want to train offline and predict online, you can use the current LR implementation to train a model and then apply model.predict on the dstream. -Xiangrui On Sun, Dec 7, 2014 at 6:30 PM, Nasir Khan nasirkhan.onl...@gmail.com wrote: I am new to spark. Lets say i want to develop a machine learning model. which trained on normal method in MLlib. I want to use that model with classifier Logistic regression and predict the streaming data coming from a file or socket. Streaming data - Logistic Regression - binary label prediction. Is it possible? since there is no streaming logistic regression algo like streaming linear regression. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Logistic-Regression-Spark-Streaming-tp20564.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib + Streaming
Hi Fernando, There’s currently no streaming ALS in Spark. I’m exploring a streaming singular value decomposition (JIRA) based on this paper (http://www.stat.osu.edu/~dmsl/thinSVDtracking.pdf), which might be one way to think about it. There has also been some cool recent work explicitly on streaming ALS w/ SGD that we should look into (https://www.cs.utexas.edu/~cjohnson/ParallelCollabFilt.pdf). — Jeremy - jeremyfreeman.net @thefreemanlab On Dec 23, 2014, at 2:47 PM, Fernando O. fot...@gmail.com wrote: Hey Xiangrui, Is there any plan to have a streaming compatible ALS version? Or if it's currently doable, is there any example? On Tue, Dec 23, 2014 at 4:31 PM, Xiangrui Meng men...@gmail.com wrote: We have streaming linear regression (since v1.1) and k-means (v1.2) in MLlib. You can check the user guide: http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression http://spark.apache.org/docs/latest/mllib-clustering.html#streaming-clustering -Xiangrui On Tue, Dec 23, 2014 at 10:01 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hi, I have recently seen a demo of Spark where different pieces were put together (training via MLlib + deploying on Spark Streaming). I was wondering if MLlib currently works to directly train on Streaming. And, if so, what are the semantics of the algorithms? If not, would it be interesting to have ML algorithms developed for the streaming setting? Thanks, -- Gianmarco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
meetup october 30-31st in SF
Hi all, We’re organizing a meetup October 30-31st in downtown SF that might be of interest to the Spark community. The focus is on large-scale data analysis and its role in neuroscience. It will feature several active Spark developers and users, including Xiangrui Meng, Josh Rosen, Reza Zadeh, and Sandy Ryza. You can sign up here (registration is free): https://www.eventbrite.com/e/codeneuro-tickets-13197330571?discount=spark Hope some of you can make it! — Jeremy -- jeremyfreeman.net @thefreemanlab
Re: Anaconda Spark AMI
Hi Ben, This is great! I just spun up an EC2 cluster and tested basic pyspark + ipython/numpy/scipy functionality, and all seems to be working so far. Will let you know if any issues arise. We do a lot with pyspark + scientific computing, and for EC2 usage I think this is a terrific way to get the core libraries installed. -- Jeremy On Jul 12, 2014, at 4:25 PM, Benjamin Zaitlen quasi...@gmail.com wrote: Hi All, Thanks to Jey's help, I have a release AMI candidate for spark-1.0/anaconda-2.0 integration. It's currently limited to availability in US-EAST: ami-3ecd0c56 Give it a try if you have some time. This should just work with spark 1.0: ./spark-ec2 -k my_key -i ~/.ssh/mykey.rsa -a ami-3ecd0c56 If you have suggestions or run into trouble please email, --Ben PS: I found that writing a noop map function is a decent way to install pkgs on worker nodes (though most scientific pkgs are pre-installed with anaconda: def subprocess_noop(x): import os os.system(/opt/anaconda/bin/conda install h5py) return 1 install_noop = rdd.map(subprocess_noop) install_noop.count() On Thu, Jul 3, 2014 at 2:32 PM, Jey Kottalam j...@cs.berkeley.edu wrote: Hi Ben, Has the PYSPARK_PYTHON environment variable been set in spark/conf/spark-env.sh to the path of the new python binary? FYI, there's a /root/copy-dirs script that can be handy when updating files on an already-running cluster. You'll want to restart the spark cluster for the changes to take effect, as described at https://spark.apache.org/docs/latest/ec2-scripts.html Hope that helps, -Jey On Thu, Jul 3, 2014 at 11:54 AM, Benjamin Zaitlen quasi...@gmail.com wrote: Hi All, I'm a dev a Continuum and we are developing a fair amount of tooling around Spark. A few days ago someone expressed interest in numpy+pyspark and Anaconda came up as a reasonable solution. I spent a number of hours yesterday trying to rework the base Spark AMI on EC2 but sadly was defeated by a number of errors. Aggregations seemed to choke -- where as small takes executed as aspected (errors are linked to the gist): sc.appName u'PySparkShell' sc._conf.getAll() [(u'spark.executor.extraLibraryPath', u'/root/ephemeral-hdfs/lib/native/'), (u'spark.executor.memory', u'6154m'), (u'spark.submit.pyFiles', u''), (u'spark.app.name', u' PySparkShell'), (u'spark.executor.extraClassPath', u'/root/ephemeral-hdfs/conf'), (u'spark.master', u'spark://.compute-1.amazonaws.com:7077')] file = sc.textFile(hdfs:///user/root/chekhov.txt) file.take(2) [uProject Gutenberg's Plays by Chekhov, Second Series, by Anton Chekhov, u''] lines = file.filter(lambda x: len(x) 0) lines.count() VARIOUS ERROS DISCUSSED BELOW My first thought was that I could simply get away with including anaconda on the base AMI, point the path at /dir/anaconda/bin, and bake a new one. Doing so resulted in some strange py4j errors like the following: Py4JError: An error occurred while calling o17.partitions. Trace: py4j.Py4JException: Method partitions([]) does not exist At some point I also saw: SystemError: Objects/cellobject.c:24: bad argument to internal function which is really strange, possibly the result of a version mismatch? I had another thought of building spark from master on the AMI, leaving the spark directory in place, and removing the spark call from the modules list in spark-ec2 launch script. Unfortunately, this resulted in the following errors: https://gist.github.com/quasiben/da0f4778fbc87d02c088 If a spark dev was willing to make some time in the near future, I'm sure she/he and I could sort out these issues and give the Spark community a python distro ready to go for numerical computing. For instance, I'm not sure how pyspark calls out to launching a python session on a slave? Is this done as root or as the hadoop user? (i believe i changed /etc/bashrc to point to my anaconda bin directory so it shouldn't really matter. Is there something special about the py4j zip include in spark dir compared with the py4j in pypi? Thoughts? --Ben
Re: error loading large files in PySpark 0.9.0
Oh cool, thanks for the heads up! Especially for the Hadoop InputFormat support. We recently wrote a custom hadoop input format so we can support flat binary files (https://github.com/freeman-lab/thunder/tree/master/scala/src/main/scala/thunder/util/io/hadoop), and have been testing it in Scala. So I was following Nick's progress and was eager to check this out when ready. Will let you guys know how it goes. -- J -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049p7144.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: error loading large files in PySpark 0.9.0
Hey Matei, Wanted to let you know this issue appears to be fixed in 1.0.0. Great work! -- Jeremy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049p6985.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark on an HPC setup
Hi Sid, We are successfully running Spark on an HPC, it works great. Here's info on our setup / approach. We have a cluster with 256 nodes running Scientific Linux 6.3 and scheduled by Univa Grid Engine. The environment also has a DDN GridScalar running GPFS and several EMC Isilon clusters serving NFS to the compute cluster. We wrote a custom qsub job to spin up Spark dynamically on a user-designated quantity of nodes. The UGE scheduler first designates a set of nodes that will be used to run Spark. Once the nodes are available, we use start-master.sh script to launch a master, and send it the addresses of the other nodes. The master then starts the workers with start-all.sh. At that point, the Spark cluster is usable and remains active until the user issues a qdel, which triggers the stop-all.sh on the master, and takes down the cluster. This worked well for us because users can pick the number of nodes to suit their job, and multiple users can run their own Spark clusters on the same system (alongside other non-Spark jobs). We don't use HDFS for the filesystem, instead relying on NFS and GPFS, and the cluster is not running Hadoop. In tests, we've seen similar performance between our set up, and using Spark w/ HDFS on EC2 with higher-end instances (matched roughly for memory and number of cores). Unfortunately we can't open source the launched scripts because they contain proprietary UGE stuff, but happy to try and answer any follow-up questions. -- Jeremy - Jeremy Freeman, PhD Neuroscientist @thefreemanlab On May 28, 2014, at 11:02 AM, Sidharth Kashyap sidharth.n.kash...@outlook.com wrote: Hi, Has anyone tried to get Spark working on an HPC setup? If yes, can you please share your learnings and how you went about doing it? An HPC setup typically comes bundled with dynamically allocated cluster and a very efficient scheduler. Configuring Spark standalone in this mode of operation is challenging as the Hadoop dependencies need to be eliminated and the cluster needs to be configured on the fly. Thanks, Sid
Re: Computing cosine similiarity using pyspark
Hi Jamal, One nice feature of PySpark is that you can easily use existing functions from NumPy and SciPy inside your Spark code. For a simple example, the following uses Spark's cartesian operation (which combines pairs of vectors into tuples), followed by NumPy's corrcoef to compute the pearson correlation coefficient between every pair of a set of vectors. The vectors are an RDD of numpy arrays. from numpy import array, corrcoef data = sc.parallelize([array([1,2,3]),array([2,4,6.1]),array([3,2,1.1])]) corrs = data.cartesian(data).map(lambda (x,y): corrcoef(x,y)[0,1]).collect() corrs [1.0, 0.0086740991746, -0.99953863896044948 ... This just returns a list of the correlation coefficients, you could also add a key to each array, to keep track of which pair is which data_with_keys = sc.parallelize([(0,array([1,2,3])),(1,array([2,4,6.1])),(2,array([3,2,1.1]))]) corrs_with_keys = data_with_keys.cartesian(data_with_keys).map(lambda ((k1,v1),(k2,v2)): ((k1,k2),corrcoef(v1,v2)[0,1])).collect() corrs_with_keys [((0, 0), 1.0), ((0, 1), 0.0086740991746), ((0, 2), -0.99953863896044948) ... Finally, you could just replace corrcoef in either of the above with scipy.spatial.distance.cosine to get your cosine similarity. Hope that's useful, as Andrei said, the answer partly depends on exactly what you're trying to do. -- Jeremy On Fri, May 23, 2014 at 2:41 PM, Andrei faithlessfri...@gmail.com wrote: Do you need cosine distance and correlation between vectors or between variables (elements of vector)? It would be helpful if you could tell us details of your task. On Thu, May 22, 2014 at 5:49 PM, jamal sasha jamalsha...@gmail.comwrote: Hi, I have bunch of vectors like [0.1234,-0.231,0.23131] and so on. and I want to compute cosine similarity and pearson correlation using pyspark.. How do I do this? Any ideas? Thanks
spark ec2 error
Hi all, A heads up in case others hit this and are confused… This nice addition https://github.com/apache/spark/pull/612 causes an error if running the spark-ec2.py deploy script from a version other than master (e.g. 0.8.0). The error occurs during launch, here: ... Creating local config files... configuring /etc/ganglia/gmond.conf Traceback (most recent call last): File ./deploy_templates.py, line 89, in module text = text.replace({{ + key + }}, template_vars[key]) TypeError: expected a character buffer object Deploying Spark config files... chmod: cannot access `/root/spark/conf/spark-env.sh': No such file or directory ... And then several more errors because of missing variables (though the cluster itself launches, there are several configuration problems, e.g. with HDFS). deploy_templates fails because the new SPARK_MASTER_OPTS and SPARK_WORKER_INSTANCES don't exist, and earlier versions of spark-ec2.py still use deploy_templates from https://github.com/mesos/spark-ec2.git -b v2, which has the new variables. Using the updated spark-ec2.py from master works fine. -- Jeremy - Jeremy Freeman, PhD Neuroscientist @thefreemanlab -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-error-tp5323.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark ec2 error
Cool, glad to help! I just tested with 0.8.1 and 0.9.0 and both worked perfectly, so seems to all be good. -- Jeremy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-error-tp5323p5329.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Initial job has not accepted any resources
Hey Pedro, From which version of Spark were you running the spark-ec2.py script? You might have run into the problem described here (http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-error-td5323.html), which Patrick just fixed up to ensure backwards compatibility. With the bug, it would successfully complete deployment but prevent the correct setting of various variables, so may have caused the errors you were seeing, though I'm not positive. I'd definitely try re-running the spark-ec2 script now. -- Jeremy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Initial-job-has-not-accepted-any-resources-tp5322p5335.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Combining RDD's columns
Hi Ian, If I understand what you're after, you might find zip useful. From the docs: Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the *same number of partitions* and the *same number of elements in each partition* (e.g. one was made through a map on the other). Here's a toy example: val rdd1 = sc.parallelize(Array(name1, name2, name3), 3) val rdd2 = sc.parallelize(Array(sign1, sign2, sign3), 3) rdd1.collect() Array[String] = Array(name1, name2, name3) rdd2.collect() Array[String] = Array(sign1, sign2, sign3) rdd1.zip(rdd2).collect() Array[(String, String)] = Array((name1,sign1), (name2,sign2), (name3,sign3)) In your case, you might have the first two RDDs calculated from some common raw data through a map. -- Jeremy - Jeremy Freeman, PhD Neuroscientist @thefreemanlab On Apr 19, 2014, at 12:59 AM, Ian Ferreira ianferre...@hotmail.com wrote: This may seem contrived but, suppose I wanted to create a collection of single column RDD's that contain calculated values, so I want to cache these to avoid re-calc. i.e. rdd1 = {Names] rdd2 = {Star Sign} rdd3 = {Age} Then I want to create a new virtual RDD that is a collection of these RDD's to create a multi-column RDD rddA = {Names, Age} rddB = {Names, Star Sign} I saw that rdd.union() merges rows, but anything that can combine columns? Cheers - Ian
Re: Scala vs Python performance differences
Hi Andrew, I'm putting together some benchmarks for PySpark vs Scala. I'm focusing on ML algorithms, as I'm particularly curious about the relative performance of MLlib in Scala vs the Python MLlib API vs pure Python implementations. Will share real results as soon as I have them, but roughly, in our hands, that 40% number is ballpark correct, at least for some basic operations (e.g textFile, count, reduce). -- Jeremy - Jeremy Freeman, PhD Neuroscientist @thefreemanlab -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scala-vs-Python-performance-differences-tp4247p4261.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark on other parallel filesystems
We run Spark (in Standalone mode) on top of a network-mounted file system (NFS), rather than HDFS, and find it to work great. It required no modification or special configuration to set this up; as Matei says, we just point Spark to data using the file location. -- Jeremy On Apr 4, 2014, at 8:12 PM, Matei Zaharia matei.zaha...@gmail.com wrote: As long as the filesystem is mounted at the same path on every node, you should be able to just run Spark and use a file:// URL for your files. The only downside with running it this way is that Lustre won’t expose data locality info to Spark, the way HDFS does. That may not matter if it’s a network-mounted file system though. Matei On Apr 4, 2014, at 4:56 PM, Venkat Krishnamurthy ven...@yarcdata.com wrote: All Are there any drawbacks or technical challenges (or any information, really) related to using Spark directly on a global parallel filesystem like Lustre/GPFS? Any idea of what would be involved in doing a minimal proof of concept? Is it just possible to run Spark unmodified (without the HDFS substrate) for a start, or will that not work at all? I do know that it’s possible to implement Tachyon on Lustre and get the HDFS interface – just looking at other options. Venkat
Re: Calling Spark enthusiasts in NYC
Happy to help with an NYC meet up (just emailed Andy). I recently moved to VA, but am back in NYC quite often, and have been turning several computational people at Columbia / NYU / Simons Foundation onto Spark; there'd definitely be interest in those communities. -- Jeremy - jeremy freeman, phd neuroscientist @thefreemanlab On Mar 31, 2014, at 2:31 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Nicholas, I'm in Boston and would be interested in a Spark group. Not sure if you know this -- there was a meetup that never got off the ground. Anyway, I'd be +1 for attending. Not sure what is involved in organizing. Seems a shame that a city like Boston doesn't have one. On Mon, Mar 31, 2014 at 2:02 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: As in, I am interested in helping organize a Spark meetup in the Boston area. On Mon, Mar 31, 2014 at 2:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Well, since this thread has played out as it has, lemme throw in a shout-out for Boston. On Mon, Mar 31, 2014 at 1:52 PM, Chris Gore cdg...@cdgore.com wrote: We'd love to see a Spark user group in Los Angeles and connect with others working with it here. Ping me if you're in the LA area and use Spark at your company ( ch...@retentionscience.com ). Chris Retention Science call: 734.272.3099 visit: Site | like: Facebook | follow: Twitter On Mar 31, 2014, at 10:42 AM, Anurag Dodeja anu...@anuragdodeja.com wrote: How about Chicago? On Mon, Mar 31, 2014 at 12:38 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Montreal or Toronto? On Mon, Mar 31, 2014 at 1:36 PM, Martin Goodson mar...@skimlinks.com wrote: How about London? -- Martin Goodson | VP Data Science (0)20 3397 1240 image.png On Mon, Mar 31, 2014 at 6:28 PM, Andy Konwinski andykonwin...@gmail.com wrote: Hi folks, We have seen a lot of community growth outside of the Bay Area and we are looking to help spur even more! For starters, the organizers of the Spark meetups here in the Bay Area want to help anybody that is interested in setting up a meetup in a new city. Some amazing Spark champions have stepped forward in Seattle, Vancouver, Boulder/Denver, and a few other areas already. Right now, we are looking to connect with you Spark enthusiasts in NYC about helping to run an inaugural Spark Meetup in your area. You can reply to me directly if you are interested and I can tell you about all of the resources we have to offer (speakers from the core community, a budget for food, help scheduling, etc.), and let's make this happen! Andy
Re: error loading large files in PySpark 0.9.0
Thanks Matei, unfortunately doesn't seem to fix it. I tried batchSize = 10, 100, as well as 1 (which should reproduce the 0.8.1 behavior?), and it stalls at the same point in each case. -- Jeremy - jeremy freeman, phd neuroscientist @thefreemanlab On Mar 23, 2014, at 9:56 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Jeremy, what happens if you pass batchSize=10 as an argument to your SparkContext? It tries to serialize that many objects together at a time, which might be too much. By default the batchSize is 1024. Matei On Mar 23, 2014, at 10:11 AM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hi all, Hitting a mysterious error loading large text files, specific to PySpark 0.9.0. In PySpark 0.8.1, this works: data = sc.textFile(path/to/myfile) data.count() But in 0.9.0, it stalls. There are indications of completion up to: 14/03/17 16:54:24 INFO TaskSetManager: Finished TID 4 in 1699 ms on X.X.X.X (progress: 15/537) 14/03/17 16:54:24 INFO DAGScheduler: Completed ResultTask(5, 4) And then this repeats indefinitely 14/03/17 16:54:24 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5, runningTasks: 144 14/03/17 16:54:25 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5, runningTasks: 144 Always stalls at the same place. There's nothing in stderr on the workers, but in stdout there are several of these messages: INFO PythonRDD: stdin writer to Python finished early So perhaps the real error is being suppressed as in https://spark-project.atlassian.net/browse/SPARK-1025 Data is just rows of space-separated numbers, ~20GB, with 300k rows and 50k characters per row. Running on a private cluster with 10 nodes, 100GB / 16 cores each, Python v 2.7.6. I doubt the data is corrupted as it works fine in Scala in 0.8.1 and 0.9.0, and in PySpark in 0.8.1. Happy to post the file, but it should repro for anything with these dimensions. It *might* be specific to long strings: I don't see it with fewer characters (10k) per row, but I also don't see it with many fewer rows but the same number of characters per row. Happy to try and provide more info / help debug! -- Jeremy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
error loading large files in PySpark 0.9.0
Hi all, Hitting a mysterious error loading large text files, specific to PySpark 0.9.0. In PySpark 0.8.1, this works: data = sc.textFile(path/to/myfile) data.count() But in 0.9.0, it stalls. There are indications of completion up to: 14/03/17 16:54:24 INFO TaskSetManager: Finished TID 4 in 1699 ms on X.X.X.X (progress: 15/537) 14/03/17 16:54:24 INFO DAGScheduler: Completed ResultTask(5, 4) And then this repeats indefinitely 14/03/17 16:54:24 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5, runningTasks: 144 14/03/17 16:54:25 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5, runningTasks: 144 Always stalls at the same place. There's nothing in stderr on the workers, but in stdout there are several of these messages: INFO PythonRDD: stdin writer to Python finished early So perhaps the real error is being suppressed as in https://spark-project.atlassian.net/browse/SPARK-1025 Data is just rows of space-separated numbers, ~20GB, with 300k rows and 50k characters per row. Running on a private cluster with 10 nodes, 100GB / 16 cores each, Python v 2.7.6. I doubt the data is corrupted as it works fine in Scala in 0.8.1 and 0.9.0, and in PySpark in 0.8.1. Happy to post the file, but it should repro for anything with these dimensions. It *might* be specific to long strings: I don't see it with fewer characters (10k) per row, but I also don't see it with many fewer rows but the same number of characters per row. Happy to try and provide more info / help debug! -- Jeremy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Machine Learning on streaming data
Thanks TD, happy to share my experience with MLLib + Spark Streaming integration. Here's a gist with two examples I have working, one for StreamingLinearRegression and another for StreamingKMeans. https://gist.github.com/freeman-lab/9672685 The goal in each case was to implement a streaming version of the algorithm, using as much as possible directly from MLLib. For Linear Regression this was straightforward, because the MLLib version already uses a (stochastic) update rule, which I just use to update the model inside a foreachRDD(), using each new batch of data. For KMeans, I used the model class from MLLib, but extended it to keep a running count for each cluster. I also had to re-implement a chunk of the core algorithm in the form of an update rule. Tighter integration in this case would, I think, require refactoring some of MLLib (e.g. to use something like this update function), but this works fine. One unresolved issue: for these kinds of algorithms, the dimensionality of the data must be known in advance. Would be cool to automatically detect it based on the first record. -- Jeremy On Mar 19, 2014, at 9:03 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Yes, of course you can conceptually apply machine learning algorithm on Spark Streaming. However the current MLLib does not yet have direct support for Spark Streaming's DStream. However, since DStreams are essentially a sequence of RDDs, you can apply MLLib algorithms on those RDDs. Take a look at DStream.transform() and DStream.foreachRDD() operations, which allows you access RDDs in a DStream. You can apply MLLib functions on them. Some people have attempted to make a tighter integration between MLLib and Spark Streaming. Jeremy (cc'ed) can say more about his adventures. TD On Sun, Mar 16, 2014 at 5:56 PM, Nasir Khan nasirkhan.onl...@gmail.com wrote: hi, I m into a project in which i have to get streaming URL's and Filter it and classify it as benin or suspicious. Now Machine Learning and Streaming are two separate things in apache spark (AFAIK). my Question is Can we apply Online Machine Learning Algorithms on Streams?? I am at Beginner Level, Kindly Explain in abit detail and if some one can direct me to some good material for me will be greats. Thanks Nasir Khan. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Machine-Learning-on-streaming-data-tp2732.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: example of non-line oriented input data?
Another vote on this, support for simple SequenceFiles and/or Avro would be terrific, as using plain text can be very space-inefficient, especially for numerical data. -- Jeremy On Mar 19, 2014, at 5:24 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'd second the request for Avro support in Python first, followed by Parquet. On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin itparan...@gmail.com wrote: On 19 Mar 2014, at 19:54, Diana Carroll dcarr...@cloudera.com wrote: Actually, thinking more on this question, Matei: I'd definitely say support for Avro. There's a lot of interest in this!! Agree, and parquet as default Cloudera Impala format. On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia matei.zaha...@gmail.com wrote: BTW one other thing — in your experience, Diana, which non-text InputFormats would be most useful to support in Python first? Would it be Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or something else? I think a per-file text input format that does the stuff we did here would also be good. Matei On Mar 18, 2014, at 3:27 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Diana, This seems to work without the iter() in front if you just return treeiterator. What happened when you didn’t include that? Treeiterator should return an iterator. Anyway, this is a good example of mapPartitions. It’s one where you want to view the whole file as one object (one XML here), so you couldn’t implement this using a flatMap, but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). We’d like to relax this later but we’re using some newer features of NumPy and Python. The rest of PySpark works on 2.6. In terms of the size in memory, here both the string s and the XML tree constructed from it need to fit in, so you can’t work on very large individual XML files. You may be able to use a streaming XML parser instead to extract elements from the data in a streaming fashion, without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader is one example. Matei On Mar 18, 2014, at 7:49 AM, Diana Carroll dcarr...@cloudera.com wrote: Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly. I have to call iter(). Why?) import xml.etree.ElementTree as ET # two source files, format data country name=../country.../data mydata=sc.textFile(file:/home/training/countries*.xml) def parsefile(iterator): s = '' for i in iterator: s = s + str(i) tree = ET.fromstring(s) treeiterator = tree.getiterator(country) # why to I have to convert an iterator to an iterator? not sure but required return iter(treeiterator) mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect() The output is what I expect: [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}] BUT I'm a bit concerned about the construction of the string s. How big can my file be before converting it to a string becomes problematic? On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll dcarr...@cloudera.com wrote: Thanks, Matei. In the context of this discussion, it would seem mapParitions is essential, because it's the only way I'm going to be able to process each file as a whole, in our example of a large number of small XML files which need to be parsed as a whole file because records are not required to be on a single line. The theory makes sense but I'm still utterly lost as to how to implement it. Unfortunately there's only a single example of the use of mapPartitions in any of the Python example programs, which is the log regression example, which I can't run because it requires Python 2.7 and I'm on Python 2.6. (aside: I couldn't find any statement that Python 2.6 is unsupported...is it?) I'd really really love to see a real life example of a Python use of mapPartitions. I do appreciate the very simple examples you provided, but (perhaps because of my novice status on Python) I can't figure out how to translate those to a real world situation in which I'm building RDDs from files, not inline collections like [(1,2),(2,3)]. Also, you say that the function called in mapPartitions can return a collection OR an iterator. I tried returning an iterator by calling ElementTree getiterator function, but still got the error telling me my object was not an iterator. If anyone has a real life example of mapPartitions returning a Python iterator, that would be fabulous.