Re: Web Service + Spark
You can take a look at http://zeppelin.incubator.apache.org. it is a notebook and graphic visual designer. On Sun, Jan 11, 2015, 01:45 Cui Lin cui@hds.com wrote: Thanks, Gaurav and Corey, Probably I didn’t make myself clear. I am looking for best Spark practice similar to Shiny for R, the analysis/visualziation results can be easily published to web server and shown from web browser. Or any dashboard for Spark? Best regards, Cui Lin From: gtinside gtins...@gmail.com Date: Friday, January 9, 2015 at 7:45 PM To: Corey Nolet cjno...@gmail.com Cc: Cui Lin cui@hds.com, user@spark.apache.org user@spark.apache.org Subject: Re: Web Service + Spark You can also look at Spark Job Server https://github.com/spark-jobserver/spark-jobserver - Gaurav On Jan 9, 2015, at 10:25 PM, Corey Nolet cjno...@gmail.com wrote: Cui Lin, The solution largely depends on how you want your services deployed (Java web container, Spray framework, etc...) and if you are using a cluster manager like Yarn or Mesos vs. just firing up your own executors and master. I recently worked on an example for deploying Spark services inside of Jetty using Yarn as the cluster manager. It forced me to learn how Spark wires up the dependencies/classpaths. If it helps, the example that resulted from my tinkering is located at [1]. [1] https://github.com/calrissian/spark-jetty-server On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin cui@hds.com wrote: Hello, All, What’s the best practice on deploying/publishing spark-based scientific applications into a web service? Similar to Shiny on R. Thanks! Best regards, Cui Lin
Re: How to set UI port #?
HI,YaoPau: You can set `spark.ui.port` to 0 ,the program will find a available port by random 2015-01-11 16:38 GMT+08:00 YaoPau jonrgr...@gmail.com: I have multiple Spark Streaming jobs running all day, and so when I run my hourly batch job, I always get a java.net.BindException: Address already in use which starts at 4040 then goes to 4041, 4042, 4043 before settling at 4044. That slows down my hourly job, and time is critical. Is there a way I can set it to 4044 by default, or prevent the UI from launching altogether? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-UI-port-tp21083.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS.trainImplicit running out of mem when using higher rank
the question really is whether this is expected that the memory requirements grow rapidly with the rank... as I would expect memory is rather O(1) problem with dependency only on the size of input data. if this is expected is there any rough formula to determine the required memory based on ALS input and parameters? thanks,Antony. On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com wrote: the actual case looks like this:* spark 1.1.0 on yarn (cdh 5.2.1)* ~8-10 executors, 36GB phys RAM per host* input RDD is roughly 3GB containing ~150-200M items (and this RDD is made persistent using .cache())* using pyspark yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 (33GB), spark is set to be:SPARK_EXECUTOR_CORES=6SPARK_EXECUTOR_INSTANCES=9SPARK_EXECUTOR_MEMORY=30G when using higher rank (above 20) for ALS.trainImplicit the executor runs after some time (~hour) of execution out of the yarn limit and gets killed: 2015-01-09 17:51:27,130 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=27125,containerID=container_1420871936411_0002_01_23] is running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container. thanks for any ideas,Antony. On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com wrote: the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected?thanks, Antony.
How to set UI port #?
I have multiple Spark Streaming jobs running all day, and so when I run my hourly batch job, I always get a java.net.BindException: Address already in use which starts at 4040 then goes to 4041, 4042, 4043 before settling at 4044. That slows down my hourly job, and time is critical. Is there a way I can set it to 4044 by default, or prevent the UI from launching altogether? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-UI-port-tp21083.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does DecisionTree model in MLlib deal with missing values?
I do not recall seeing support for missing values. Categorical values are encoded as 0.0, 1.0, 2.0, ... When training the model you indicate which are interpreted as categorical with the categoricalFeaturesInfo parameter, which maps feature offset to count of distinct categorical values for the feature. On Sun, Jan 11, 2015 at 6:54 AM, Carter gyz...@hotmail.com wrote: Hi, I am new to the MLlib in Spark. Can the DecisionTree model in MLlib deal with missing values? If so, what data structure should I use for the input? Moreover, my data has categorical features, but the LabeledPoint requires double data type, in this case what can I do? Thank you very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-DecisionTree-model-in-MLlib-deal-with-missing-values-tp21080.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS.trainImplicit running out of mem when using higher rank
I would expect the size of the user/item feature RDDs to grow linearly with the rank, of course. They are cached, so that would drive cache memory usage on the cluster. This wouldn't cause executors to fail for running out of memory though. In fact, your error does not show the task failing for lack of memory. What it shows is that YARN thinks the task is using a little bit more memory than it said it would, and killed it. This happens sometimes with JVM-based YARN jobs since a JVM configured to use X heap ends up using a bit more than X physical memory if the heap reaches max size. So there's a bit of headroom built in and controlled by spark.yarn.executor.memoryOverhead (http://spark.apache.org/docs/latest/running-on-yarn.html) You can try increasing it to a couple GB. On Sun, Jan 11, 2015 at 9:43 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: the question really is whether this is expected that the memory requirements grow rapidly with the rank... as I would expect memory is rather O(1) problem with dependency only on the size of input data. if this is expected is there any rough formula to determine the required memory based on ALS input and parameters? thanks, Antony. On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com wrote: the actual case looks like this: * spark 1.1.0 on yarn (cdh 5.2.1) * ~8-10 executors, 36GB phys RAM per host * input RDD is roughly 3GB containing ~150-200M items (and this RDD is made persistent using .cache()) * using pyspark yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 (33GB), spark is set to be: SPARK_EXECUTOR_CORES=6 SPARK_EXECUTOR_INSTANCES=9 SPARK_EXECUTOR_MEMORY=30G when using higher rank (above 20) for ALS.trainImplicit the executor runs after some time (~hour) of execution out of the yarn limit and gets killed: 2015-01-09 17:51:27,130 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=27125,containerID=container_1420871936411_0002_01_23] is running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container. thanks for any ideas, Antony. On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com wrote: the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected? thanks, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: train many decision tress with a single spark job
You just mean you want to divide the data set into N subsets, and do that dividing by user, not make one model per user right? I suppose you could filter the source RDD N times, and build a model for each resulting subset. This can be parallelized on the driver. For example let's say you divide into N subsets depending on the value of the user ID modulo N: val N = ... (0 until N).par.map(d = DecisionTree.train(data.filter(_.userID % N == d), ...)) data should be cache()-ed here of course. However it may be faster and more principled to take random subsets directly: data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset = DecisionTree.train(subset, ...)) On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum jbuf...@gmail.com wrote: I've got a data set of activity by user. For each user, I'd like to train a decision tree model. I currently have the feature creation step implemented in Spark and would naturally like to use mllib's decision tree model. However, it looks like the decision tree model expects the whole RDD and will train a single tree. Can I split the RDD by user (i.e. groupByKey) and then call the DecisionTree.trainClassifer in a reduce() or aggregate function to create a RDD[DecisionTreeModels]? Maybe train the model with an in-memory dataset instead of an RDD? Call sc.parallelize on the Iterable values in a groupBy to create a mini-RDD? Has anyone else tried something like this with success? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ScalaReflectionException when using saveAsParquetFile in sbt
I have the same exception when I run the following example fromSpark SQL Programming Guide - Spark 1.2.0 Documentation | | | | | | | | | Spark SQL Programming Guide - Spark 1.2.0 DocumentationSpark SQL Programming Guide Overview Getting Started Data Sources RDDs Inferring the Schema Using Reflection Programmatically Specifying the Schema Parquet Files Loading Data Programmatically | | | | View on spark.apache.org | Preview by Yahoo | | | | | But it works fine when I run it from intellij . I am using Spark 1.2 for Scala 2.10.4. sbt 0.13.7. import org.apache.spark.SparkContext // Define the schema using a case class.// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interface.case class Person(name: String, age: Int) object SparkSqlDemo { def main(args: Array[String]) { val sc = new SparkContext(local, SparkSqlTest) // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. import sqlContext.createSchemaRDD // Create an RDD of Person objects and register it as a table. val people = sc.textFile(src/test/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)) people.registerTempTable(people) // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql(SELECT name FROM people WHERE age = 13 AND age = 19) // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. teenagers.map(t = Name: + t(0)).collect().foreach(println) }} Shing On Wednesday, 7 January 2015, 21:55, figpope drewii...@gmail.com wrote: I'm on Spark 1.2.0, with Scala 1.11.2, and SBT 0.13.7. When running: case class Test(message: String) val sc = new SparkContext(local, shell) val sqlContext = new SQLContext(sc) import sqlContext._ val testing = sc.parallelize(List(Test(this), Test(is), Test(a), Test(test))) testing.saveAsParquetFile(test) I get the following error: scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@64fc9229 of type class java.net.URLClassLoader with classpath [file:/Users/andrew/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.11.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang/scala-compiler/jars/scala-compiler-2.11.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang/scala-reflect/jars/scala-reflect-2.11.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang.modules/scala-xml_2.11/bundles/scala-xml_2.11-1.0.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang.modules/scala-parser-combinators_2.11/bundles/scala-parser-combinators_2.11-1.0.2.jar,file:/Users/andrew/.ivy2/cache/jline/jline/jars/jline-2.12.jar] and parent being xsbt.boot.BootFilteredLoader@1f421ab0 of type class xsbt.boot.BootFilteredLoader with classpath [unknown] and parent being sun.misc.Launcher$AppClassLoader@372f2b32 of type class sun.misc.Launcher$AppClassLoader with classpath [file:/usr/local/Cellar/sbt/0.13.7/libexec/sbt-launch.jar] and parent being sun.misc.Launcher$ExtClassLoader@79bcfbeb of type class sun.misc.Launcher$ExtClassLoader with classpath [file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/dnsns.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/localedata.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/sunec.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/zipfs.jar,file:/System/Library/Java/Extensions/AppleScriptEngine.jar,file:/System/Library/Java/Extensions/dns_sd.jar,file:/System/Library/Java/Extensions/j3daudio.jar,file:/System/Library/Java/Extensions/j3dcore.jar,file:/System/Library/Java/Extensions/j3dutils.jar,file:/System/Library/Java/Extensions/jai_codec.jar,file:/System/Library/Java/Extensions/jai_core.jar,file:/System/Library/Java/Extensions/libAppleScriptEngine.jnilib,file:/System/Library/Java/Extensions/libJ3D.jnilib,file:/System/Library/Java/Extensions/libJ3DAudio.jnilib,file:/System/Library/Java/Extensions/libJ3DUtils.jnilib,file:/System/Library/Java/Extensions/libmlib_jai.jnilib,file:/System/Library/Java/Extensions/libQTJNative.jnilib,file:/System/Library/Java/Extensions/mlibwrapper_jai.jar,file:/System/Library/Java/Extensions/MRJToolkit.jar,file:/System/Library/Java/Extensions/QTJava.zip,file:/System/Library/Java/Extensions/vecmath.jar,file:/usr/lib/java/libjdns_sd.jnilib] and parent being primordial classloader with boot classpath
Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?
On 1/11/15 1:40 PM, Nathan McCarthy wrote: Thanks Cheng Michael! Makes sense. Appreciate the tips! Idiomatic scala isn't performant. I’ll definitely start using while loops or tail recursive methods. I have noticed this in the spark code base. I might try turning off columnar compression (via /spark.sql.inMemoryColumnarStorage.compressed=false /correct?) and see how performance compares to the primitive objects. Would you expect to see similar runtimes vs the primitive objects? We do have the luxury of lots of memory at the moment so this might give us an additional performance boost. Turning off compression should be faster, but still slower than directly using primitive objects. Because Spark SQL also serializes all objects within a column into byte buffers in a compact format. However, this radically reduces number of Java objects in the heap and is more GC friendly. When running large queries, cost introduced by GC can be significant. Regarding the defensive copying of row objects. Can we switch this off and just be aware of the risks? Is MapPartitions on SchemaRDDs and operating on the Row object the most performant way to be flipping between SQL Scala user code? Is there anything else I could be doing? This can be very dangerous and error prone. Whenever an operator tries to cache row objects, turning off defensive copying can introduce wrong query result. For example, sort-based shuffle caches rows to do sorting. In some cases, sample operator may also cache row objects. This is very implementation specific and may change between versions. Cheers, ~N From: Michael Armbrust mich...@databricks.com mailto:mich...@databricks.com Date: Saturday, 10 January 2015 3:41 am To: Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com Cc: Nathan nathan.mccar...@quantium.com.au mailto:nathan.mccar...@quantium.com.au, user@spark.apache.org mailto:user@spark.apache.org user@spark.apache.org mailto:user@spark.apache.org Subject: Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats? The other thing to note here is that Spark SQL defensively copies rows when we switch into user code. This probably explains the difference between 1 2. The difference between 1 3 is likely the cost of decompressing the column buffers vs. accessing a bunch of uncompressed primitive objects. On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Hey Nathan, Thanks for sharing, this is a very interesting post :) My comments are inlined below. Cheng On 1/7/15 11:53 AM, Nathan McCarthy wrote: Hi, I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via rdd.mapPartitions(…). Using the latest release 1.2.0. Simple example; load up some sample data from parquet on HDFS (about 380m rows, 10 columns) on a 7 node cluster. val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”) t.registerTempTable(test1”) sqlC.cacheTable(test1”) Now lets do some operations on it; I want the total sales quantities sold for each hour in the day so I choose 3 out of the 10 possible columns... sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by Hour).collect().foreach(println) After the table has been 100% cached in memory, this takes around 11 seconds. Lets do the same thing but via a MapPartitions call (this isn’t production ready code but gets the job done). val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”) rddPC.mapPartitions { case hrs = val qtySum = new Array[Double](24) val salesSum = new Array[Double](24) for(r - hrs) { val hr = r.getInt(0) qtySum(hr) += r.getDouble(1) salesSum(hr) += r.getDouble(2) } (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println) I believe the evil thing that makes this snippet much slower is the for-loop. According to my early benchmark done with Scala 2.9, for-loop can be orders of magnitude slower than a simple while-loop, especially when the body of the loop only does something as trivial as this case. The reason is that Scala for-loop is translated into corresponding foreach/map/flatMap/withFilter function calls. And that's exactly why Spark SQL tries to avoid for-loop or any other functional style code in critical paths (where every row is touched), we also uses reusable mutable row objects instead of the immutable version to improve performance. You may check HiveTableScan, ParquetTableScan, InMemoryColumnarTableScan etc. for reference. Also, the `sum` function calls in your SQL code are translated into `o.a.s.s.execution.Aggregate` operators, which also use imperative while-loop and reusable mutable rows. Another thing to notice is
Re: Job priority
If you set up a number of pools equal to the number of different priority levels you want, make the relative weights of those pools very different, and submit a job to the pool representing its priority, I think youll get behavior equivalent to a priority queue. Try it and see. If I'm misunderstandng what youre trying to do, then I don't know. On Sunday, January 11, 2015, Alessandro Baretta alexbare...@gmail.com wrote: Cody, Maybe I'm not getting this, but it doesn't look like this page is describing a priority queue scheduling policy. What this section discusses is how resources are shared between queues. A weight-1000 pool will get 1000 times more resources allocated to it than a priority 1 queue. Great, but not what I want. I want to be able to define an Ordering on make my tasks representing their priority, and have Spark allocate all resources to the job that has the highest priority. Alex On Sat, Jan 10, 2015 at 10:11 PM, Cody Koeninger c...@koeninger.org javascript:_e(%7B%7D,'cvml','c...@koeninger.org'); wrote: http://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties Setting a high weight such as 1000 also makes it possible to implement *priority* between pools—in essence, the weight-1000 pool will always get to launch tasks first whenever it has jobs active. On Sat, Jan 10, 2015 at 11:57 PM, Alessandro Baretta alexbare...@gmail.com javascript:_e(%7B%7D,'cvml','alexbare...@gmail.com'); wrote: Mark, Thanks, but I don't see how this documentation solves my problem. You are referring me to documentation of fair scheduling; whereas, I am asking about as unfair a scheduling policy as can be: a priority queue. Alex On Sat, Jan 10, 2015 at 5:00 PM, Mark Hamstra m...@clearstorydata.com javascript:_e(%7B%7D,'cvml','m...@clearstorydata.com'); wrote: -dev, +user http://spark.apache.org/docs/latest/job-scheduling.html On Sat, Jan 10, 2015 at 4:40 PM, Alessandro Baretta alexbare...@gmail.com javascript:_e(%7B%7D,'cvml','alexbare...@gmail.com'); wrote: Is it possible to specify a priority level for a job, such that the active jobs might be scheduled in order of priority? Alex
Re: Spark SQL: Storing AVRO Schema in Parquet
I think AvroWriteSupport class already saves avro schema as part of parquet meta data. You can think of using parquet-mr https://github.com/Parquet/parquet-mr directly. Raghavendra On Fri, Jan 9, 2015 at 10:32 PM, Jerry Lam chiling...@gmail.com wrote: Hi Raghavendra, This makes a lot of sense. Thank you. The problem is that I'm using Spark SQL right now to generate the parquet file. What I think I need to do is to use Spark directly and transform all rows from SchemaRDD to avro objects and supply it to use saveAsNewAPIHadoopFile (from the PairRDD). From there, I can supply the avro schema to parquet via AvroParquetOutputFormat. It is not difficult just not as simple as I would like because SchemaRDD can write to Parquet file using its schema and if I can supply the avro schema to parquet, it save me the transformation step for avro objects. I'm thinking of overriding the saveAsParquetFile method to allows me to persist the avro schema inside parquet. Is this possible at all? Best Regards, Jerry On Fri, Jan 9, 2015 at 2:05 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I cam across this http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/. You can take a look. On Fri Jan 09 2015 at 12:08:49 PM Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I have the similar kind of requirement where I want to push avro data into parquet. But it seems you have to do it on your own. There is parquet-mr project that uses hadoop to do so. I am trying to write a spark job to do similar kind of thing. On Fri, Jan 9, 2015 at 3:20 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, I'm using spark SQL to create parquet files on HDFS. I would like to store the avro schema into the parquet meta so that non spark sql applications can marshall the data without avro schema using the avro parquet reader. Currently, schemaRDD.saveAsParquetFile does not allow to do that. Is there another API that allows me to do this? Best Regards, Jerry
Re: Removing JARs from spark-jobserver
Thank you Abhishek. That works. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Removing-JARs-from-spark-jobserver-tp21081p21084.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Job priority
Cody, While I might be able to improve the scheduling of my jobs by using a few different pools with weights equal to, say, 1, 1e3 and 1e6, effectively getting a small handful of priority classes. Still, this is really not quite what I am describing. This is why my original post was on the dev list. Let me then ask if there is any interest in having priority queue job scheduling in Spark. This is something I might be able to pull off. Alex On Sun, Jan 11, 2015 at 6:21 AM, Cody Koeninger c...@koeninger.org wrote: If you set up a number of pools equal to the number of different priority levels you want, make the relative weights of those pools very different, and submit a job to the pool representing its priority, I think youll get behavior equivalent to a priority queue. Try it and see. If I'm misunderstandng what youre trying to do, then I don't know. On Sunday, January 11, 2015, Alessandro Baretta alexbare...@gmail.com wrote: Cody, Maybe I'm not getting this, but it doesn't look like this page is describing a priority queue scheduling policy. What this section discusses is how resources are shared between queues. A weight-1000 pool will get 1000 times more resources allocated to it than a priority 1 queue. Great, but not what I want. I want to be able to define an Ordering on make my tasks representing their priority, and have Spark allocate all resources to the job that has the highest priority. Alex On Sat, Jan 10, 2015 at 10:11 PM, Cody Koeninger c...@koeninger.org wrote: http://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties Setting a high weight such as 1000 also makes it possible to implement *priority* between pools—in essence, the weight-1000 pool will always get to launch tasks first whenever it has jobs active. On Sat, Jan 10, 2015 at 11:57 PM, Alessandro Baretta alexbare...@gmail.com wrote: Mark, Thanks, but I don't see how this documentation solves my problem. You are referring me to documentation of fair scheduling; whereas, I am asking about as unfair a scheduling policy as can be: a priority queue. Alex On Sat, Jan 10, 2015 at 5:00 PM, Mark Hamstra m...@clearstorydata.com wrote: -dev, +user http://spark.apache.org/docs/latest/job-scheduling.html On Sat, Jan 10, 2015 at 4:40 PM, Alessandro Baretta alexbare...@gmail.com wrote: Is it possible to specify a priority level for a job, such that the active jobs might be scheduled in order of priority? Alex
RE: Submit Spark applications from a machine that doesn't have Java installed
Cant speak to the internals of SparkSubmit and how to reproduce sans jvm, guess would depend if you want/need to support various deployment enviroments (stand-alone, mesos, yarn, etc) If just need YARN, or looking at starting point, might want to look at capabilities of YARN API: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceM anagerRest.html#Cluster_Applications_APISubmit_Application From: Nick Chammas [mailto:nicholas.cham...@gmail.com] Sent: Sunday, January 11, 2015 1:45 PM To: user@spark.apache.org Subject: Submit Spark applications from a machine that doesn't have Java installed Is it possible to submit a Spark application to a cluster from a machine that does not have Java installed? My impression is that many, many more computers come with Python installed by default than do with Java. I want to write a command-line utility https://issues.apache.org/jira/browse/SPARK-3499 that submits a Spark application to a remote cluster. I want that utility to run on as many machines as possible out-of-the-box, so I want to avoid a dependency on Java (or a JRE) if possible. Nick _ View this message in context: Submit Spark applications from a machine that doesn't have Java installed http://apache-spark-user-list.1001560.n3.nabble.com/Submit-Spark-applicatio ns-from-a-machine-that-doesn-t-have-Java-installed-tp21085.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
RE: Does DecisionTree model in MLlib deal with missing values?
Is there any plan to extend the data types that would be accepted by the Tree models in Spark? e.g. Many models that we build contain a large number of string-based categorical factors. Currently the only strategy is to map these string values to integers, and store the mapping so the data can be remapped when the model is scored. A viable solution, but cumbersome for models with hundreds of these kinds of factors. Concerning missing data, I haven't been able to figure out how to use NULL values in LabeledPoints, and I'm not sure whether DecisionTrees correctly handle the case of missing data. The only thing I've been able to work out is to use a placeholder value, which is not really what is needed. I think this will introduce bias in the model if there is a significant proportion of missing data. e.g. suppose we have a factor that is TimeSpentonX. If 20% of values are missing, what numeric value should this missing data be replaced with? Almost every choice will bias the final model...what we really want is the algorithm to just ignore those values. cheers chris -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Sunday, 11 January 2015 10:53 PM To: Carter Cc: user@spark.apache.org Subject: Re: Does DecisionTree model in MLlib deal with missing values? I do not recall seeing support for missing values. Categorical values are encoded as 0.0, 1.0, 2.0, ... When training the model you indicate which are interpreted as categorical with the categoricalFeaturesInfo parameter, which maps feature offset to count of distinct categorical values for the feature. On Sun, Jan 11, 2015 at 6:54 AM, Carter gyz...@hotmail.com wrote: Hi, I am new to the MLlib in Spark. Can the DecisionTree model in MLlib deal with missing values? If so, what data structure should I use for the input? Moreover, my data has categorical features, but the LabeledPoint requires double data type, in this case what can I do? Thank you very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-DecisionTree- model-in-MLlib-deal-with-missing-values-tp21080.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org Christopher Thom QUANTIUM Level 25, 8 Chifley, 8-12 Chifley Square Sydney NSW 2000 T: +61 2 8222 3577 F: +61 2 9292 6444 W: quantium.com.auwww.quantium.com.au linkedin.com/company/quantiumwww.linkedin.com/company/quantium facebook.com/QuantiumAustraliawww.facebook.com/QuantiumAustralia twitter.com/QuantiumAUwww.twitter.com/QuantiumAU The contents of this email, including attachments, may be confidential information. If you are not the intended recipient, any use, disclosure or copying of the information is unauthorised. If you have received this email in error, we would be grateful if you would notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from your system. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: propogating edges
It looks like to be similar (simpler) to the connected component implementation in GraphX. Have you checked that ? I have questions though, in your example, the graph is a tree. What is the behavior if it is a more general graph ? Cheers, Anwar Rizal. On Mon, Jan 12, 2015 at 1:02 AM, dizzy5112 dave.zee...@gmail.com wrote: Hi all looking for some help in propagating some values in edges. What i want to achieve (see diagram) is for each connected part of the graph assign an incrementing value for each of the out links from the root node. This value will restart again for the next part of the graph. ie node 1 has out links to node 2,3,and 4. The edge attribute for these will be 1,2 and 3 respectively. For each of the out links from these nodes they keep this value right through to the final node in their path. For node 9 with out link to 10 this has an edge attribute of 1 etc etc. Thanks in advance for any help :) dave http://apache-spark-user-list.1001560.n3.nabble.com/file/n21086/Drawing2.jpg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/propogating-edges-tp21086.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue writing to Cassandra from Spark
Hi Akhil, thank you for your response. Actually we are first reading from cassandra and then writing back after doing some processing. All the reader stages succeed with no error and many writer stages also succeed but many fail as well. Thanks Ankur On Sat, Jan 10, 2015 at 10:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you are not connecting to the Old RPC Port (9160), new binary port is running on 9042. What is your rpc_address listed in cassandra.yaml? Also make sure you have start_native_transport: *true *in the yaml file. Thanks Best Regards On Sat, Jan 10, 2015 at 8:44 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, We are currently using spark to join data in Cassandra and then write the results back into Cassandra. While reads happen with out any error during the writes we see many exceptions like below. Our environment details are: - Spark v 1.1.0 - spark-cassandra-connector-java_2.10 v 1.1.0 We are using below settings for the writer spark.cassandra.output.batch.size.rows=1 spark.cassandra.output.concurrent.writes=1 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: [] - use getErrors() for details) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:108) at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:179) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thanks Ankur
propogating edges
Hi all looking for some help in propagating some values in edges. What i want to achieve (see diagram) is for each connected part of the graph assign an incrementing value for each of the out links from the root node. This value will restart again for the next part of the graph. ie node 1 has out links to node 2,3,and 4. The edge attribute for these will be 1,2 and 3 respectively. For each of the out links from these nodes they keep this value right through to the final node in their path. For node 9 with out link to 10 this has an edge attribute of 1 etc etc. Thanks in advance for any help :) dave http://apache-spark-user-list.1001560.n3.nabble.com/file/n21086/Drawing2.jpg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/propogating-edges-tp21086.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Submit Spark applications from a machine that doesn't have Java installed
Is it possible to submit a Spark application to a cluster from a machine that does not have Java installed? My impression is that many, many more computers come with Python installed by default than do with Java. I want to write a command-line utility https://issues.apache.org/jira/browse/SPARK-3499 that submits a Spark application to a remote cluster. I want that utility to run on as many machines as possible out-of-the-box, so I want to avoid a dependency on Java (or a JRE) if possible. Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submit-Spark-applications-from-a-machine-that-doesn-t-have-Java-installed-tp21085.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to recovery application running records when I restart Spark master?
Hi all, Due to some reasons, I restarted Spark master node. Before I restart it, there were some application running records at the bottom of the master web page. But they are gone after I restart the master node. The records include application name, running time, status, and so on. I am sure you know what I am talking about. My question is: 1) how can I recovery those records when I restart my Spark master node. 2) If I cannot recovery it, where should I go to look for them? Actually, I am caring about the running time of finished applications. Thank you for your help, and hope every body is doing great! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data locality running Spark on Mesos
I tried two Spark stand-alone configurations: SPARK_WORKER_CORES=1 SPARK_WORKER_MEMORY=1g SPARK_WORKER_INSTANCES=6 spark.driver.memory 1g spark.executor.memory 1g spark.storage.memoryFraction 0.9 --total-executor-cores 60 In the second configuration (same as first, but): SPARK_WORKER_CORES=6 SPARK_WORKER_MEMORY=6g SPARK_WORKER_INSTANCES=1 spark.executor.memory 6g Runs using the first configuration have faster execution times compared with Spark runs on my configuration of Mesos (both coarse-grained and fine-grained), Runs using second configuration had about the same execution time as with Mesos. Looking at the logs again, it looks like the locality info between the stand-alone and Mesos coarse-grained mode are very similar. I must have been hallucinating earlier thinking somehow the data locality information was different. So this whole thing might just simply be due to the fact that it is not possible in Mesos right now to set the number of executors. Even in fine-grained mode, there seems to be just one executor per node (I had thought differently in my previous message). The workloads I've tried apparently performs better with many executors per node than a single powerful executor per node. Would be really useful once this feature you've mentioned: https://issues.apache.org/jira/browse/SPARK-5095 is implemented. Spark on Mesos fine-grained configuration: driver memory = 1G spark.executor.memory 6g (tried also with 1g, still one executor per node and execution time roughly the same) spark.storage.memoryFraction 0.9 Mike From: Timothy Chen t...@mesosphere.io To: Michael V Le/Watson/IBM@IBMUS Cc: user user@spark.apache.org Date: 01/10/2015 04:31 AM Subject:Re: Data locality running Spark on Mesos Hi Michael, I see you capped the cores to 60. I wonder what's the settings you used for standalone mode that you compared with? I can try to run a MLib workload on both to compare. Tim On Jan 9, 2015, at 6:42 AM, Michael V Le m...@us.ibm.com wrote: Hi Tim, Thanks for your response. The benchmark I used just reads data in from HDFS and builds the Linear Regression model using methods from the MLlib. Unfortunately, for various reasons, I can't open the source code for the benchmark at this time. I will try to replicate the problem using some sample benchmarks provided by the vanilla Spark distribution. It is very possible that I have something very screwy in my workload or setup. The parameters I used for the Spark on Mesos are the following: driver memory = 1G total-executor-cores = 60 spark.executor.memory 6g spark.storage.memoryFraction 0.9 spark.mesos.coarse = true The rest are default values, so spark.locality.wait should just be 3000ms. I launched the Spark job on a separate node from the 10-node cluster using spark-submit. With regards to Mesos in fine-grained mode, do you have a feel for the overhead of launching executors for every task? Of course, any perceived slow down will probably be very dependent on the workload. I just want to have a feel of the possible overhead (e.g., factor of 2 or 3 slowdown?). If not a data locality issue, perhaps this overhead can be a factor in the slowdown I observed, at least in the fine-grained case. BTW: i'm using Spark ver 1.1.0 and Mesos ver 0.20.0 Thanks, Mike graycol.gifTim Chen ---01/08/2015 03:04:51 PM---How did you run this benchmark, and is there a open version I can try it with? From: Tim Chen t...@mesosphere.io To: Michael V Le/Watson/IBM@IBMUS Cc: user user@spark.apache.org Date: 01/08/2015 03:04 PM Subject: Re: Data locality running Spark on Mesos How did you run this benchmark, and is there a open version I can try it with? And what is your configurations, like spark.locality.wait, etc? Tim On Thu, Jan 8, 2015 at 11:44 AM, mvle m...@us.ibm.com wrote: Hi, I've noticed running Spark apps on Mesos is significantly slower compared to stand-alone or Spark on YARN. I don't think it should be the case, so I am posting the problem here in case someone has some explanation or can point me to some configuration options i've missed. I'm running the LinearRegression benchmark with a dataset of 48.8GB. On a 10-node stand-alone Spark cluster (each node 4-core, 8GB of RAM), I can finish the workload in about 5min (I don't remember exactly). The data is loaded into HDFS spanning the same 10-node cluster. There are 6 worker instances per node. However, when running the same workload on the same cluster but now with Spark on Mesos
Re: ALS.trainImplicit running out of mem when using higher rank
this seems to have sorted it, awesome, thanks for great help.Antony. On Sunday, 11 January 2015, 13:02, Sean Owen so...@cloudera.com wrote: I would expect the size of the user/item feature RDDs to grow linearly with the rank, of course. They are cached, so that would drive cache memory usage on the cluster. This wouldn't cause executors to fail for running out of memory though. In fact, your error does not show the task failing for lack of memory. What it shows is that YARN thinks the task is using a little bit more memory than it said it would, and killed it. This happens sometimes with JVM-based YARN jobs since a JVM configured to use X heap ends up using a bit more than X physical memory if the heap reaches max size. So there's a bit of headroom built in and controlled by spark.yarn.executor.memoryOverhead (http://spark.apache.org/docs/latest/running-on-yarn.html) You can try increasing it to a couple GB. On Sun, Jan 11, 2015 at 9:43 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: the question really is whether this is expected that the memory requirements grow rapidly with the rank... as I would expect memory is rather O(1) problem with dependency only on the size of input data. if this is expected is there any rough formula to determine the required memory based on ALS input and parameters? thanks, Antony. On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com wrote: the actual case looks like this: * spark 1.1.0 on yarn (cdh 5.2.1) * ~8-10 executors, 36GB phys RAM per host * input RDD is roughly 3GB containing ~150-200M items (and this RDD is made persistent using .cache()) * using pyspark yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 (33GB), spark is set to be: SPARK_EXECUTOR_CORES=6 SPARK_EXECUTOR_INSTANCES=9 SPARK_EXECUTOR_MEMORY=30G when using higher rank (above 20) for ALS.trainImplicit the executor runs after some time (~hour) of execution out of the yarn limit and gets killed: 2015-01-09 17:51:27,130 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=27125,containerID=container_1420871936411_0002_01_23] is running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container. thanks for any ideas, Antony. On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com wrote: the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected? thanks, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Trouble with large Yarn job
Hi Anders, Have you checked your NodeManager logs to make sure YARN isn't killing executors for exceeding memory limits? -Sandy On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com wrote: Hey, I have a job that keeps failing if too much data is processed, and I can't see how to get it working. I've tried repartitioning with more partitions and increasing amount of memory for the executors (now about 12G and 400 executors. Here is a snippets of the first part of the code, which succeeds without any problems: val all_days = sc.union( ds.dateInterval(startDate, date).map(date = sc.avroFile[LrDailyEndSong](daily_end_song_path + date) .map(s = ( (s.getUsername, s.getTrackUri), UserItemData(s.getUsername, s.getTrackUri, build_vector1(date, s), build_vector2(s ) ) .reduceByKey(sum_vectors) I want to process 30 days of data or more, but am only able to process about 10 days. If having more days of data (lower startDate in code above), the union above succeeds but the code below fails with Error communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed error messages). Here is a snippet of the code that fails: val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey (_+_) .filter(trackFilter) .repartition(4) .persist(StorageLevel.MEMORY_AND_DISK_SER) val observation_data = all_days .mapPartitions(_.map(o = (o._1._2.toString, o._2))) .join(top_tracks) The calculation of top_tracks works, but the last mapPartitions task fails with given error message if given more than 10 days of data. Also tried increasing the spark.akka.askTimeout setting, but it still fails even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the kryo serialization. Realize that this is a rather long message, but I'm stuck and would appreciate any help or clues for resolving this issue. Seems to be a out-of-memory issue, but it does not seems to help to increase the number of partitions. Thanks, Anders
Support for SQL on unions of tables (merge tables?)
Dear List, What are common approaches for addressing over a union of tables / RDDs? E.g. suppose I have a collection of log files in HDFS, one log file per day, and I want to compute the sum of some field over a date range in SQL. Using log schema, I can read each as a distinct SchemaRDD, but I want to union them all and query against one 'table'. If this data were in MySQL, I could have a table for each day of data and use a MyISAM merge table to union these tables together and just query against the merge table. What's nice here is that MySQL persists the merge table, and the merge table is r/w, so one can just update the merge table once per day. (What's not nice is that merge tables scale poorly, backup admin is a pain, and oh hey I'd like to use Spark not MySQL). One naive and untested idea (that achieves implicit persistence): scan an HDFS directory for log files, create one RDD per file, union() the RDDs, then create a Schema RDD from that union(). A few specific questions: * Any good approaches to a merge / union table? (Other than the naive idea above). Preferably with some way to persist that table / RDD between Spark runs. (How does Impala approach this problem?) * Has anybody tried joining against such a union of tables / RDDs on a very large amount of data? When I've tried (non-spark-sql) union()ing Sequence Files, and then join()ing them against another RDD, Spark seems to try to compute the full union before doing any join() computation (and eventually OOMs the cluster because the union of Sequence Files is so big). I haven't tried something similar with Spark SQL. * Are there any plans related to this in the Spark roadmap? (This feature would be a nice compliment to, say, persistent RDD indices for interactive querying). * Related question: are there plans to use Parquet Index Pages to make Spark SQL faster? E.g. log indices over date ranges would be relevant here. All the best, -Paul
what is the roadmap for Spark SQL dialect in the coming releases?
Hi, I found out that SparkSQL supports only a relatively small subset of SQL dialect currently. I would like to know the roadmap for the coming releases. And, are you focusing more on popularizing the 'Hive on Spark' SQL dialect or the Spark SQL dialect? Rgds -- Niranda
Re: Issue writing to Cassandra from Spark
I see, can you paste the piece of code? Its probably because you are exceeding the number of connection that are specified in the property rpc_max_threads. Make sure you close all the connections properly. Thanks Best Regards On Mon, Jan 12, 2015 at 7:45 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi Akhil, thank you for your response. Actually we are first reading from cassandra and then writing back after doing some processing. All the reader stages succeed with no error and many writer stages also succeed but many fail as well. Thanks Ankur On Sat, Jan 10, 2015 at 10:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you are not connecting to the Old RPC Port (9160), new binary port is running on 9042. What is your rpc_address listed in cassandra.yaml? Also make sure you have start_native_transport: *true *in the yaml file. Thanks Best Regards On Sat, Jan 10, 2015 at 8:44 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, We are currently using spark to join data in Cassandra and then write the results back into Cassandra. While reads happen with out any error during the writes we see many exceptions like below. Our environment details are: - Spark v 1.1.0 - spark-cassandra-connector-java_2.10 v 1.1.0 We are using below settings for the writer spark.cassandra.output.batch.size.rows=1 spark.cassandra.output.concurrent.writes=1 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: [] - use getErrors() for details) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:108) at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:179) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thanks Ankur