Re: How to work with a joined rdd in pyspark?
Hi. Your code is like this right? "/joined_dataset = show_channel.join(show_views) joined_dataset.take(4)/" well /joined_dataset / is now an array (because you used /.take(4)/ ). So it does not support any RDD operations.. Could that be the problem? Otherwise more code is needed to understand what's going on. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-work-with-a-joined-rdd-in-pyspark-tp25510p25511.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and simulated annealing
1) Start by looking at ML-lib or KeystoneML 2) If you can't find an impl., start by analyzing the access patterns and data manipulations you will need to implement. 3) Then figure out if it fits Spark structures.. and when you realized it doesn't you start speculating on how you can twist or strong-arm it to fit :) 4) You skipped 2) didn't you.. ok so do that now and go back to 3). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-simulated-annealing-tp25507p25513.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to work with a joined rdd in pyspark?
Hi. Can't you do a filter, to get only the ABC shows, map that into a keyed instance of the show, and then do a reduceByKey to sum up the views? Something like this in Scala code: /filter for the channel new pair (show, view count) / val myAnswer = joined_dataset.filter( _._2._1 == "ABC" ).map( (_._1, _._2._2) .reduceByKey( (a,b) => a + b ) This should give you an RDD of one record per show and the summed view count but only for shows on ABC right? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-work-with-a-joined-rdd-in-pyspark-tp25510p25514.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: partition RDD of images
Look at KeystoneML, there is an image processing pipeline there -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/partition-RDD-of-images-tp25515p25518.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to work with a joined rdd in pyspark?
Can't you just access it by element, like with [0] and [1] ? http://www.tutorialspoint.com/python/python_tuples.htm -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-work-with-a-joined-rdd-in-pyspark-tp25510p25517.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: WARN MemoryStore: Not enough space
"spark.storage.memoryFraction 0.05" If you want to store a lot of memory I think this must be a higher fraction. The default is 0.6 (not 0.0X). To change the output directory you can set "spark.local.dir=/path/to/dir" and you can even specify multiple directories (for example if you have multiple mounted devices) by using a ',' between the paths. I.e. "spark.local.dir=/path/to/dir1,/path/to/dir2" or --conf "spark.local.dir=/mnt/,/mnt2/" (at launch time). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/WARN-MemoryStore-Not-enough-space-tp25492p25500.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why does a 3.8 T dataset take up 11.59 Tb on HDFS
HDFS has a default replication factor of 3 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-does-a-3-8-T-dataset-take-up-11-59-Tb-on-HDFS-tp25471p25497.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Optimizing large collect operations
Hi. I am doing very large collectAsMap() operations, about 10,000,000 records, and I am getting "org.apache.spark.SparkException: Error communicating with MapOutputTracker" errors.. details: "org.apache.spark.SparkException: Error communicating with MapOutputTracker at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(1)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) ... 12 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ... 13 more" I have already set set the akka.timeout to 300 etc. Anyone have any ideas on what the problem could be ? Regares, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Optimizing-large-collect-operations-tp25498.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to lookup by a key in an RDD
Hi. You may want to look into Indexed RDDs https://github.com/amplab/spark-indexedrdd Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-lookup-by-a-key-in-an-RDD-tp25243p25247.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: job hangs when using pipe() with reduceByKey()
Hi. What is slow exactly? In code-base 1: When you run the persist() + count() you stored the result in RAM. Then the map + reducebykey is done on in-memory data. In the latter case (all-in-oneline) you are doing both steps at the same time. So you are saying that if you sum-up the time to do both steps in the first code-base it is still much faster than the latter code-base ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242p25248.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Spark use more memory than MapReduce?
By default Spark will actually not keep the data at all, it will just store "how" to recreate the data. The programmer can however choose to keep the data once instantiated by calling "/.persist()/" or "/.cache()/" on the RDD. /.cache/ will store the data in-memory only and fail if it will not fit. /.persist/ will by default use memory but spill to disk if needed. /.persist(StorageLevel)/ allows you to write it all to disk (no in-memory overhead). See: http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence In addition, you can define your own StorageLevel and thus if you have magnetic and SSD disks you can choose to persist the data to the disk-level you want (depending on how "hot" you consider the data). Essentially, you have full freedom to do what you will with the data in Spark :) Hope this helps. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-use-more-memory-than-MapReduce-tp25030p25087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why the length of each task varies
Hi. Have you ruled out that this may just be I/O time? Word count is a very light-wight task for the CPU but you will be needing to read the initial data from what ever storage device you have your HDFS running on. As you have 3 machines, 22 cores each but perhaps just one or a few HDD / SSD / NAS the 22 cores may be saturating your I/O capacity and thus I/O determines the running time or your task? If it is some form of NAS storage you may be saturating the network capacity. If this is the case, that would explain fluctuations in the observed running times. A given Map-task may have been lucky, and the data was read when the I/O was not busy, or unlucky, many machine cores (map-tasks) starting a new block at about the same time. Also, 22*256MB = 5632 MB: This is the RAM you need to cache a block of data for each map-task running in parallel on the same machine. Depending on how much RAM you have per node, you may want to re-block the data on HDFS for optimal performance. Hope this helps, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-length-of-each-task-varies-tp24008p24014.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Create RDD from output of unix command
You may want to look into using the pipe command .. http://blog.madhukaraphatak.com/pipe-in-spark/ http://spark.apache.org/docs/0.6.0/api/core/spark/rdd/PipedRDD.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-RDD-from-output-of-unix-command-tp23723p23895.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Passing Broadcast variable as parameter
Hi. You can use a broadcast variable to make data available to all the nodes in your cluster that can live longer then just the current distributed task. For example if you need a to access a large structure in multiple sub-tasks, instead of sending that structure again and again with each sub-task you can send it only once and access the data inside the operation (map, flatmap etc.) by way of the broadcast variable name .value See : https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables Note however that you should treat the broadcast variable as a read-only structure as it is not synced between workers after it is broadcasted. To broadcast, your data must be serializable. If the data you are trying to broadcast is a distributed RDD (and thus I assumably large), perhaps what you need is some form of join operation (or cogroup)? Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Passing-Broadcast-variable-as-parameter-tp23760p23898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: write a HashMap to HDFS in Spark
Hi. Assuming your have the data in an RDD you can save your RDD (regardless of structure) with nameRDD.saveAsObjectFile(path) where path can be hdfs:///myfolderonHDFS or the local file system. Alternatively you can also use .saveAsTextFile() Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/write-a-HashMap-to-HDFS-in-Spark-tp23813p23897.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark same execution time on 1 node and 5 nodes
Hi. If I just look at the two pics, I see that there is only one sub-task that takes all the time.. This is the flatmapToPair at Coef... line 52. I also see that there are only two partitions that make up the input and thus probably only two workers active. Try repartitioning the data into more parts before line 52 by calling rddname.repartition(10) for example and see if it runs faster.. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-same-execution-time-on-1-node-and-5-nodes-tp23866p23893.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Flatten list
Hi. To be honest I don't really understand your problem declaration :( but lets just talk about how .flatmap works. Unlike .map(), that only allows a one-to-one transformation, .flatmap() allows 0, 1 or many outputs per item processed but the output must take the form of a sequence of the same type, like a /List/ for example. All the sequences will then be merged (i.e. flattened) in the end into a single RDD of that type. Note however that an array does not inherit from Sequence and thus you must transform it to a Sequence or something that inherits from AbstractSeq, like a List. See http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.List vs. http://www.scala-lang.org/api/current/index.html#scala.Array For example, lets assume you have an RDD[(Array[Int])] and you want all the Int values flattened into a single RDD[(Int)]. The code would be something like so: val intArraysRDD : RDD[(Array[Int])] = ...some code to get array... val flattnedIntRDD : RDD[(Int)] = intArraysRDD.flatmap( array = { var ret : List[(Int)] = nil for ( i - array) { ret = i :: ret } ret }) This is an intentionally explicit version.. A simpler could would be something like this .. val flattnedIntRDD : RDD[(Int)] = intArraysRDD.flatmap( array = array.toList) However, to understand exactly your problem you need to explain better what the RDD you want to create should look like.. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Flatten-list-tp23887p23892.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using reference for RDD is safe?
Hi. All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset. See section RDD Operations in https://spark.apache.org/docs/1.2.0/programming-guide.html Thus, neither your myrdd2 nor myrdd will exist until you call the count. What is stored is just how to create myrdd and myrdd2 so yes, this is safe.. When you run myrdd2.count the both RDDs are created, myrdd2 is counted and the count printed out. After the operation both RDDs are destroyed again. If you run the myrdd2.count again, both myrdd and myrdd2 are created again .. If your transformation is expensive, you may want to keep the data around and for that must use .persist() or .cache() etc. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-reference-for-RDD-is-safe-tp23843p23894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No. of Task vs No. of Executors
You could even try changing the block size of the input data on HDFS (can be done on a per file basis) and that would get all workers going right from the get-go in Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824p23896.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: K Nearest Neighbours
Hi. What I would do in your case would be something like this.. Lets call the two datasets, qs and ds, where qs is an array of vectors and ds is an RDD[(dsID: Long, Vector)]. Do the following: 1) create a k-NN class that can keep track of the k-Nearest Neighbors so far. It must have a qsID and some structure for the k nearest neighbors Seq[(dsID:Long, Distance: Long)] and the function .add( nn : (Long, Vector) ) that will do the distance calc and update the kNN when appropriate. 2) collect the qs and key-it as well, so each qs has an ID, i.e. qs = Array[(qsID : Long, Vector)] Now what you want to do is not create all the distance stuff, but just the k-NNs. To do this we will actually create a few k-NN for each query vector, one for each partition, and then merge them later. 3) do a ds.mapPartition() and inside the function you create a k-NN for the each qs, scan the ds points of the partition and output an iterator pointing to the set of k-NNs created. val k = 100 val qs = new Array[(KNNClass)]() val ds = RDD[(Long, Vector)]() val knnResults = ds.mapPartitions( itr = { val knns = qs.map( qp = (qp._1, new KNNClass(k, qp) ) itr.foreach( dp = { knns.foreach( knn = knn.add( dp )) } ) knns.iterator }) Now you have one k-NN per partition for each query point, but this we can simply fix by doing a reduceByKey and merge all the k-NNs for each qpID into a single k-NN. val knnResultFinal = knnResults.reduceByKey( (a, b) = KNNClass.merge( a, b) ) Where you have a static function that merges the two k-NNs, i.e. we simply concatenate them and sort on distance, and then take the k top values and returns them as a new knn class. If you want to control how many k-NNs are create you can always repartition ds. How does that sound? Does this make any sense? :) Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Nearest-Neighbours-tp23759p23899.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to black list nodes on the cluster
Hi again, Ok, now I do not know of any way to fix the problem other then delete the bad machine from the config + restart .. And you will need admin privileges on cluster for that :( However, before we give up on the speculative execution, I suspect that the task is being run again and again on the same faulty machine because that is where the data resides. You could try to store / persist your RDD with MEMORY_ONLY_2 or MEMORY_AND_DISK_2 as that will force the creation of a replica of the data on another node. Thus, with two nodes, the scheduler may choose to execute the speculative task on the second node (I'm not sure about his as I am just not familiar enough with the Sparks scheduler priorities). I'm not very hopeful but it may be worth a try (if you have the disk/RAM space to be able to afford to duplicate all the data that is). If not, I am afraid I am out of ideas ;) Regards and good luck, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-black-list-nodes-on-the-cluster-tp23650p23704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do we control output part files created by Spark job?
Hi. I am just wondering if the rdd was actually modified. Did you test it by printing rdd.partitions.length before and after? Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649p23705.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why Kryo Serializer is slower than Java Serializer in TeraSort
Hi. Just a few quick comment on your question. If you drill into (click the link of the subtasks) you can get more detailed view of the tasks. One of the things reported is the time for serialization. If that is your dominant factor it should be reflected there, right? Are you sure the input data is not getting cached between runs (i.e. does the order of the experiments matter and did you explicitly flush the operation system memory between runs etc. etc.)? If you now run the old experiment again, does it take the same amount of time again? Did you validate that the results where actually correct? Hope this helps.. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-Kryo-Serializer-is-slower-than-Java-Serializer-in-TeraSort-tp23621p23659.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do we control output part files created by Spark job?
Hi. Have you tried to repartition the finalRDD before saving? This link might help. http://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_the_rdd_to_files.html Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649p23660.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to black list nodes on the cluster
Hi. Have you tried to enable speculative execution? This will allow Spark to run the same sub-task of the job on other available slots when slow tasks are encountered. This can be passed at execution time with the params are: spark.speculation spark.speculation.interval spark.speculation.multiplier spark.speculation.quantile See https://spark.apache.org/docs/latest/configuration.html under Scheduling. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-black-list-nodes-on-the-cluster-tp23650p23661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org