Re: Spark 1.2.x Yarn Auxiliary Shuffle Service
Is this what you are looking for 1. Build Spark with the YARN profile http://spark.apache.org/docs/1.2.0/building-spark.html. Skip this step if you are using a pre-packaged distribution. 2. Locate the spark-version-yarn-shuffle.jar. This should be under $SPARK_HOME/network/yarn/target/scala-version if you are building Spark yourself, and under lib if you are using a distribution. 3. Add this jar to the classpath of all NodeManagers in your cluster. 4. In the yarn-site.xml on each node, add spark_shuffle to yarn.nodemanager.aux-services, then set yarn.nodemanager.aux-services.spark_shuffle.class to org.apache.spark.network.yarn.YarnShuffleService. Additionally, set all relevantspark.shuffle.service.* configurations http://spark.apache.org/docs/1.2.0/configuration.html. 5. Restart all NodeManagers in your cluster. On Wed, Jan 28, 2015 at 1:30 AM, Corey Nolet cjno...@gmail.com wrote: I've read that this is supposed to be a rather significant optimization to the shuffle system in 1.1.0 but I'm not seeing much documentation on enabling this in Yarn. I see github classes for it in 1.2.0 and a property spark.shuffle.service.enabled in the spark-defaults.conf. The code mentions that this is supposed to be run inside the Nodemanager so I'm assuming it needs to be wired up in the yarn-site.xml under the yarn.nodemanager.aux-services property? -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: generate a random matrix with uniform distribution
Thanks a lot! Can I ask why this code generates a uniform distribution? If dist is N(0,1) data should be N(-1, 2). Let me know. Thanks, Luca 2015-02-07 3:00 GMT+00:00 Burak Yavuz brk...@gmail.com: Hi, You can do the following: ``` import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.random._ // sc is the spark context, numPartitions is the number of partitions you want the RDD to be in val dist: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, n, k, numPartitions, seed) // make the distribution uniform between (-1, 1) val data = dist.map(_ * 2 - 1) val matrix = new RowMatrix(data, n, k) On Feb 6, 2015 11:18 AM, Donbeo lucapug...@gmail.com wrote: Hi I would like to know how can I generate a random matrix where each element come from a uniform distribution in -1, 1 . In particular I would like the matrix be a distributed row matrix with dimension n x p Is this possible with mllib? Should I use another library? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/generate-a-random-matrix-with-uniform-distribution-tp21538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
OutofMemoryError: Java heap space
Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at
Need a spark application.
Hi, Can someone please suggest some real life application implemented in spark ( things like gene sequencing) that is of type below code. Basically, the application should have jobs submitted via as many threads as possible. I need similar kind of spark application for benchmarking. val threadA = new Thread(new Runnable { def run() { for(i- 0 until end) { val numAs = logData.filter(line = line.contains(a)) // numAs.saveAsTextFile(hdfs:/t1) println(Lines with a: %s.format(numAs.count)) } } }) val threadB = new Thread(new Runnable { def run() { for(i- 0 until end) { val numBs = logData.filter(line = line.contains(b)) // numBs.saveAsTextFile(hdfs:/t2) println(Lines with b: %s.format(numBs.count)) } } }) val threadC = new Thread(new Runnable { def run() { for(i- 0 until end) { val numCs = logData.filter(line = line.contains(c)) // numCs.saveAsTextFile(hdfs:/t3) println(Lines with c: %s.format( numCs.count)) } } }) val threadD = new Thread(new Runnable { def run() { for(i- 0 until end) { val numDs = logData.filter(line = line.contains(d)) // numDs.saveAsTextFile(hdfs:/t4) println(Lines with d: %s.format( numDs.count)) } } }) Regards Karthik -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Need-a-spark-application-tp21552.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Custom streaming receiver slow on YARN
replying to my own thread; I realized that this only happens when the replication level is 1. Regardless of whether setting memory_only or disk or deserialized, I had to make the replication level = 2 to make the streaming work properly on YARN. I still don't get it why, because intuitively less replication should imply faster computation, and testing on a cloudera VM everything worked fine on YARN. If I am missing something important, please let me know. I am going to settle down to '..._2' variants for now. Jong Wook -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-streaming-receiver-slow-on-YARN-tp21544p21553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL - Point lookup optimisation in SchemaRDD?
Hi All, I have a use case where I have cached my schemaRDD and I want to launch executors just on the partition which I know of (prime use-case of PartitionPruningRDD). I tried something like following :- val partitionIdx = 2 val schemaRdd = hiveContext.table(myTable) //myTable is cached in memory val partitionPrunedRDD = new PartitionPrunedRDD(schemaRdd, _ == partitionIdx) val partitionSchemaRDD = hiveContext.applySchema(partitionPrunedRDD, schemaRdd.schema) partitionSchemaRDD.registerTempTable(myTablePartition2) hiveContext.hql(select * from myTablePartition2 where id=10001) If I do this, if I expect my executor to run query in 500ms, it is running in 3000-4000 ms. I think this is happening because I did applySchema and lost the queryExecution plan. But, if I do partitionSchemaRDD.cache as well, then I get the 500ms performance but in this case, same partition/data is getting cached twice. My question is that can we create a PartitionPruningCachedSchemaRDD like class which can prune the partitions of InMemoryColumnarTableScan's RDD[CachedBatch] and launch executor on just the selected partition(s)? Thanks -Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Point-lookup-optimisation-in-SchemaRDD-tp21555.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Driver Host under Yarn
Are you running in yarn-cluster or yarn-client mode? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Driver-Host-under-Yarn-tp21536p21556.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Driver Host under Yarn
Yarn-cluster. When i run in yarn-client the driver is just run on the machine that runs spark-submit. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Driver-Host-under-Yarn-tp21536p21558.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark (yarn-client mode) Hangs in final stages of Collect or Reduce
Have you checked the corresponding executor logs as well? I think information provided by you here is less to actually understand your issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-yarn-client-mode-Hangs-in-final-stages-of-Collect-or-Reduce-tp21551p21557.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile of RDD[Array[Any]]
If you have `RDD[Array[Any]]` you can do rdd.map(_.mkString(\t)) or with some other delimiter to make it `RDD[String]`, and then call `saveAsTextFile`. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-of-RDD-Array-Any-tp21548p21554.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: generate a random matrix with uniform distribution
Sorry about that, yes, it should be uniformVectorRDD. Thanks Sean! Burak On Mon, Feb 9, 2015 at 2:05 AM, Sean Owen so...@cloudera.com wrote: Yes the example given here should have used uniformVectorRDD. Then it's correct. On Mon, Feb 9, 2015 at 9:56 AM, Luca Puggini lucapug...@gmail.com wrote: Thanks a lot! Can I ask why this code generates a uniform distribution? If dist is N(0,1) data should be N(-1, 2). Let me know. Thanks, Luca 2015-02-07 3:00 GMT+00:00 Burak Yavuz brk...@gmail.com: Hi, You can do the following: ``` import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.random._ // sc is the spark context, numPartitions is the number of partitions you want the RDD to be in val dist: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, n, k, numPartitions, seed) // make the distribution uniform between (-1, 1) val data = dist.map(_ * 2 - 1) val matrix = new RowMatrix(data, n, k) On Feb 6, 2015 11:18 AM, Donbeo lucapug...@gmail.com wrote: Hi I would like to know how can I generate a random matrix where each element come from a uniform distribution in -1, 1 . In particular I would like the matrix be a distributed row matrix with dimension n x p Is this possible with mllib? Should I use another library? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/generate-a-random-matrix-with-uniform-distribution-tp21538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
running spark project using java -cp command
hi experts! Is there any way to run spark application using java -cp command ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/running-spark-project-using-java-cp-command-tp21567.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: running spark project using java -cp command
Yes like this: /usr/lib/jvm/java-7-openjdk-i386/bin/java -cp ::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main spark-shell It launches spark-shell. Thanks Best Regards On Tue, Feb 10, 2015 at 11:36 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: hi experts! Is there any way to run spark application using java -cp command ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/running-spark-project-using-java-cp-command-tp21567.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to create spark AMI in AWS
Hi Nicholas, Thanks for your quick reply. I'd like to try to build a image with create_image.sh. Then let's see how we can launch spark cluster in region cn-north-1. Guodong On Tue, Feb 10, 2015 at 3:59 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Guodong, spark-ec2 does not currently support the cn-north-1 region, but you can follow [SPARK-4241](https://issues.apache.org/jira/browse/SPARK-4241) to find out when it does. The base AMI used to generate the current Spark AMIs is very old. I'm not sure anyone knows what it is anymore. What I know is that it is an Amazon Linux AMI. Yes, the create_image.sh script is what is used to generate the current Spark AMI. Nick On Mon Feb 09 2015 at 3:27:13 AM Franc Carter franc.car...@rozettatech.com wrote: Hi, I'm very new to Spark, but experienced with AWS - so take that in to account with my suggestions. I started with an AWS base image and then added the pre-built Spark-1.2. I then added made a 'Master' version and a 'Worker' versions and then made AMIs for them. The Master comes up with a static IP and the Worker image has this baked in. I haven't completed everything I am planning to do but so far I can bring up the Master and a bunch of Workers inside and ASG and run spark code successfully. cheers On Mon, Feb 9, 2015 at 10:06 PM, Guodong Wang wangg...@gmail.com wrote: Hi guys, I want to launch spark cluster in AWS. And I know there is a spark_ec2.py script. I am using the AWS service in China. But I can not find the AMI in the region of China. So, I have to build one. My question is 1. Where is the bootstrap script to create the Spark AMI? Is it here( https://github.com/mesos/spark-ec2/blob/branch-1.3/create_image.sh) ? 2. What is the base image of the Spark AMI? Eg, the base image of this ( https://github.com/mesos/spark-ec2/blob/branch-1.3/ami-list/us-west-1/hvm ) 3. Shall I install scala during building the AMI? Thanks. Guodong -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Re: Will Spark serialize an entire Object or just the method referred in an object?
Hi Marcelo, Thanks for the explanation! So you mean in this way, actually only the output of the map closure would need to be serialized so that it could be passed further for other operations (maybe reduce or else)? And we don't have to worry about Utils.funcX because for each closure instance we would load a new instance containing the func1 and func2 from jars that are already cached into local nodes? Thanks, Yitong 2015-02-09 14:35 GMT-08:00 Marcelo Vanzin van...@cloudera.com: `func1` and `func2` never get serialized. They must exist on the other end in the form of a class loaded by the JVM. What gets serialized is an instance of a particular closure (the argument to your map function). That's a separate class. The instance of that class that is serialized contains references to all other instances it needs to execute its apply method (or run or whatever is the correct method name). In this case, nothing is needed, since all it does is pass its argument in a call to a static method (Util.func1). Hope that helps, these things can be really confusing. You can play with javap -c to disassemble the class files to understand better how it all happens under the hood. On Mon, Feb 9, 2015 at 1:56 PM, Yitong Zhou timyit...@gmail.com wrote: If we define an Utils object: object Utils { def func1 = {..} def func2 = {..} } And then in a RDD we refer to one of the function: rdd.map{r = Utils.func1(r)} Will Utils.func2 also get serialized or not? Thanks, Yitong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Will-Spark-serialize-an-entire-Object-or-just-the-method-referred-in-an-object-tp21566.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: How to create spark AMI in AWS
OK, good luck! On Mon Feb 09 2015 at 6:41:14 PM Guodong Wang wangg...@gmail.com wrote: Hi Nicholas, Thanks for your quick reply. I'd like to try to build a image with create_image.sh. Then let's see how we can launch spark cluster in region cn-north-1. Guodong On Tue, Feb 10, 2015 at 3:59 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Guodong, spark-ec2 does not currently support the cn-north-1 region, but you can follow [SPARK-4241](https://issues.apache.org/jira/browse/SPARK-4241) to find out when it does. The base AMI used to generate the current Spark AMIs is very old. I'm not sure anyone knows what it is anymore. What I know is that it is an Amazon Linux AMI. Yes, the create_image.sh script is what is used to generate the current Spark AMI. Nick On Mon Feb 09 2015 at 3:27:13 AM Franc Carter franc.car...@rozettatech.com wrote: Hi, I'm very new to Spark, but experienced with AWS - so take that in to account with my suggestions. I started with an AWS base image and then added the pre-built Spark-1.2. I then added made a 'Master' version and a 'Worker' versions and then made AMIs for them. The Master comes up with a static IP and the Worker image has this baked in. I haven't completed everything I am planning to do but so far I can bring up the Master and a bunch of Workers inside and ASG and run spark code successfully. cheers On Mon, Feb 9, 2015 at 10:06 PM, Guodong Wang wangg...@gmail.com wrote: Hi guys, I want to launch spark cluster in AWS. And I know there is a spark_ec2.py script. I am using the AWS service in China. But I can not find the AMI in the region of China. So, I have to build one. My question is 1. Where is the bootstrap script to create the Spark AMI? Is it here( https://github.com/mesos/spark-ec2/blob/branch-1.3/create_image.sh) ? 2. What is the base image of the Spark AMI? Eg, the base image of this ( https://github.com/mesos/spark-ec2/blob/branch-1.3/ami-list/us-west-1/hvm ) 3. Shall I install scala during building the AMI? Thanks. Guodong -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Re: textFile partitions
The partitions parameter to textFile is the minPartitions. So there will be at least that level of parallelism. Spark delegates to Hadoop to create the splits for that file (yes, even for a text file on disk and not hdfs). You can take a look at the code in FileInputFormat - but briefly it will compute the block size to use and create at least the number of partitions passed into it. It can create more blocks. Hope this helps, Kostas On Mon, Feb 9, 2015 at 8:00 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, puzzled by something pretty simple: I have a standalone cluster with default parallelism of 2, spark-shell running with 2 cores sc.textFile(README.md).partitions.size returns 2 (this makes sense) sc.textFile(README.md).coalesce(100,true).partitions.size returns 100, also makes sense but sc.textFile(README.md,100).partitions.size gives 102 --I was expecting this to be equivalent to last statement (i.e.result in 100 partitions) I'd appreciate if someone can enlighten me as to why I end up with 102 This is on Spark 1.2 thanks
External Data Source in SPARK
Hi, We implemented an External Data Source by extending the TableScan . We added the classes to the classpath The data source works fine when run in Spark Shell . But currently we are unable to use this same data source in Python Environment. So when we execute the following below in an Ipython notebook sqlContext.sql(CREATE TEMPORARY TABLE dataTable USING MyDataSource OPTIONS (partitions '2')) we get the following error : Py4JJavaError: An error occurred while calling o78.sql. : java.lang.RuntimeException: Failed to load class for data source: MyDataSource How to expose this data source for consumption even in PySpark environment. Regards, Santosh
SparkSQL 1.2 and ElasticSearch-Spark 1.4 not working together, NoSuchMethodError problems
Hello Spark community and Holden, I am trying to follow Holden Karau's SparkSQL and ElasticSearch tutorial from Spark Summit 2014. I am trying to use elasticsearch-spark 2.1.0.Beta3 and SparkSQL 1.2 together. https://github.com/holdenk/elasticsearchspark *(Side Note: This very nice tutorial does NOT BUILD with sbt 0.13 and sbt-assembly 0.12.0-M1 and Spark 1.2...so many problems I just gave up. )* In any case, I am trying to work from Holden's tutorial to see ElasticSearch and SparkSQL interoperate, with the data being a bunch of JSON documents in ElasticSearch. The problem is with the following code, at the line with *sqlCtx.esRDD(..., ...):* // this is my def main(args) inside of my test Object val Array(esResource, esNodes) = args.take(2) val conf = new SparkConf().setAppName(TestEsSpark) conf.set(es.index.auto.create, true) val sc = new SparkContext(conf) val sqlCtx = new SQLContext(sc) import sqlCtx._ val query = {query:{match:{schemaName: SuperControllerRequest.json}}} *val searched = sqlCtx.esRDD(esResource, query)* // PROBLEM HERE - println(searched.schema) I can assemble this with sbt assembly, after much work in getting SBT to work. However, at RUN TIME, I have the following output, which complains my sqlCtx.esRDD() has a NoSuchMethodError org.apache.spark.sql.catalyst.types.StructField according to ElasticSearch. This is a nightmare and I cannot get it to work, does anybody know how to make this extremely simple test work? Further below is my SBT build file that I managed to get to work, borrowing from Holden's build.sbt. // RUN time exception Exception in thread main *java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField* .init(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:27) at org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:24) at org.elasticsearch.spark.sql.package$SQLContextFunctions.esRDD(package.scala:17) *at testesspark.TestEsSpark$.main(TestEsSpark.scala:41) * // PROBLEM HERE points to above line in code at testesspark.TestEsSpark.main(TestEsSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) //build.sbt import sbtassembly.PathList name := testesspark version := 0.1.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.2.0 exclude(org.eclipse.jetty.orbit, javax.servlet) exclude(org.eclipse.jetty.orbit, javax.transaction) exclude(org.eclipse.jetty.orbit, javax.mail) exclude(org.eclipse.jetty.orbit, javax.activation) exclude(commons-beanutils, commons-beanutils-core) exclude(commons-collections, commons-collections) exclude(com.esotericsoftware.minlog, minlog) exclude(org.slf4j, jcl-over-slf4j ) exclude(org.apache.hadoop,hadoop-yarn-api) exclude(org.slf4j,slf4j-api), org.apache.spark %% spark-sql % 1.2.0 exclude(org.eclipse.jetty.orbit, javax.servlet) exclude(org.eclipse.jetty.orbit, javax.transaction) exclude(org.eclipse.jetty.orbit, javax.mail) exclude(org.eclipse.jetty.orbit, javax.activation) exclude(commons-beanutils, commons-beanutils-core) exclude(commons-collections, commons-collections) exclude(com.esotericsoftware.minlog, minlog) exclude(org.slf4j, jcl-over-slf4j ) exclude(org.slf4j,slf4j-api),
Re: MLLib: feature standardization
`mean()` and `variance()` are not defined in `Vector`. You can use the mean and variance implementation from commons-math3 (http://commons.apache.org/proper/commons-math/javadocs/api-3.4.1/index.html) if you don't want to implement them. -Xiangrui On Fri, Feb 6, 2015 at 12:50 PM, SK skrishna...@gmail.com wrote: Hi, I have a dataset in csv format and I am trying to standardize the features before using k-means clustering. The data does not have any labels but has the following format: s1, f12,f13,... s2, f21,f22,... where s is a string id, and f is a floating point feature value. To perform feature standardization, I need to compute the mean and variance/std deviation of the features values in each element of the RDD (i.e each row). However, the summary Statistics library in Spark MLLib provides only a colStats() method that provides column-wise mean and variance. I tried to compute the mean and variance per row, using the code below but got a compilation error that there is no mean() or variance() method for a tuple or Vector object. Is there a Spark library to compute the row-wise mean and variance for an RDD, where each row (i.e. element) of the RDD is a Vector or tuple of N feature values? thanks My code for standardization is as follows: //read the data val data=sc.textFile(file_name) .map(_.split(,)) // extract the features. For this example I am using only 2 features, but the data has more features val features = data.map(d= Vectors.dense(d(1).toDouble, d(2).toDouble)) val std_features = features.map(f= { val fmean = f.mean() // Error: NO MEAN() for a Vector or Tuple object val fstd= scala.math.sqrt(f.variance())// Error: NO variance() for a Vector or Tuple object for (i - 0 to f.length) // standardize the features { var fs = 0.0 if (fstd 0.0) fs = (f(i) - fmean)/fstd fs } } ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-feature-standardization-tp21539.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to create spark AMI in AWS
Guodong, spark-ec2 does not currently support the cn-north-1 region, but you can follow [SPARK-4241](https://issues.apache.org/jira/browse/SPARK-4241) to find out when it does. The base AMI used to generate the current Spark AMIs is very old. I'm not sure anyone knows what it is anymore. What I know is that it is an Amazon Linux AMI. Yes, the create_image.sh script is what is used to generate the current Spark AMI. Nick On Mon Feb 09 2015 at 3:27:13 AM Franc Carter franc.car...@rozettatech.com wrote: Hi, I'm very new to Spark, but experienced with AWS - so take that in to account with my suggestions. I started with an AWS base image and then added the pre-built Spark-1.2. I then added made a 'Master' version and a 'Worker' versions and then made AMIs for them. The Master comes up with a static IP and the Worker image has this baked in. I haven't completed everything I am planning to do but so far I can bring up the Master and a bunch of Workers inside and ASG and run spark code successfully. cheers On Mon, Feb 9, 2015 at 10:06 PM, Guodong Wang wangg...@gmail.com wrote: Hi guys, I want to launch spark cluster in AWS. And I know there is a spark_ec2.py script. I am using the AWS service in China. But I can not find the AMI in the region of China. So, I have to build one. My question is 1. Where is the bootstrap script to create the Spark AMI? Is it here( https://github.com/mesos/spark-ec2/blob/branch-1.3/create_image.sh) ? 2. What is the base image of the Spark AMI? Eg, the base image of this ( https://github.com/mesos/spark-ec2/blob/branch-1.3/ami-list/us-west-1/hvm ) 3. Shall I install scala during building the AMI? Thanks. Guodong -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Re: Number of goals to win championship
Logistic regression outputs probabilities if the data fits the model assumption. Otherwise, you might need to calibrate its output to correctly read it. You may be interested in reading this: http://fastml.com/classifier-calibration-with-platts-scaling-and-isotonic-regression/. We have isotonic regression implemented in Spark 1.3. Another problem with your input is that the dataset is too small. Try to put more points and see the result. Also, use LogisticRegressionWithLBFGS, which is better than the SGD implementation. -Xiangrui On Thu, Feb 5, 2015 at 10:40 AM, jvuillermet jeremy.vuiller...@gmail.com wrote: I want to find the minimum number of goals for a player that likely allows its team to win the championship. My data : goals win/loose 25 1 5 0 10 1 20 0 After some reading and courses, I think I need a Logistic Regression model to get those datas. I create my LabeledPoint with those data (1/0 being the label) and use val model = LogisticRegressionWithSGD.train model.clearTreshold() I then try some model.predict(Vectors.dense(10)) but don't understand the output. All the results are 0.5 and I'm not even sure how to use the predicted value. Am I using the good model ? How do I read the predicted value ? What do I need more to find a goal number from which it's likely your team will win the championship or say (3/4 chances to win it) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-goals-to-win-championship-tp21519.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: no option to add intercepts for StreamingLinearAlgorithm
No particular reason. We didn't add it in the first version. Let's add it in 1.4. -Xiangrui On Thu, Feb 5, 2015 at 3:44 PM, jamborta jambo...@gmail.com wrote: hi all, just wondering if there is a reason why it is not possible to add intercepts for streaming regression models? I understand that run method in the underlying GeneralizedLinearModel does not take intercept as a parameter either. Any reason for that? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/no-option-to-add-intercepts-for-StreamingLinearAlgorithm-tp21526.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [MLlib] Performance issues when building GBM models
Could you check the Spark UI and see whether there are RDDs being kicked out during the computation? We cache the residual RDD after each iteration. If we don't have enough memory/disk, it gets recomputed and results something like `t(n) = t(n-1) + const`. We might cache the features multiple times, which could be improved. -Xiangrui On Sun, Feb 8, 2015 at 5:32 PM, Christopher Thom christopher.t...@quantium.com.au wrote: Hi All, I wonder if anyone else has some experience building a Gradient Boosted Trees model using spark/mllib? I have noticed when building decent-size models that the process slows down over time. We observe that the time to build tree n is approximately a constant time longer than the time to build tree n-1 i.e. t(n) = t(n-1) + const. The implication is that the total build time goes as something like N^2, where N is the total number of trees. I would expect that the algorithm should be approximately linear in total time (i.e. each boosting iteration takes roughly the same time to complete). So I have a couple of questions: 1. Is this behaviour expected, or consistent with what others are seeing? 2. Does anyone know if there a tuning parameters (e.g. in the boosting strategy, or tree stategy) that may be impacting this? All aspects of the build seem to slow down as I go. Here's a random example culled from the logs, from the beginning and end of the model build: 15/02/09 17:22:11 INFO scheduler.DAGScheduler: Job 42 finished: count at DecisionTreeMetadata.scala:111, took 0.077957 s 15/02/09 19:44:01 INFO scheduler.DAGScheduler: Job 7954 finished: count at DecisionTreeMetadata.scala:111, took 5.495166 s Any thoughts or advice, or even suggestions on where to dig for more info would be welcome. thanks chris Christopher Thom QUANTIUM Level 25, 8 Chifley, 8-12 Chifley Square Sydney NSW 2000 T: +61 2 8222 3577 F: +61 2 9292 6444 W: quantium.com.auwww.quantium.com.au linkedin.com/company/quantiumwww.linkedin.com/company/quantium facebook.com/QuantiumAustraliawww.facebook.com/QuantiumAustralia twitter.com/QuantiumAUwww.twitter.com/QuantiumAU The contents of this email, including attachments, may be confidential information. If you are not the intended recipient, any use, disclosure or copying of the information is unauthorised. If you have received this email in error, we would be grateful if you would notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from your system. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: no space left at worker node
In other words, the working command is: /root/spark/bin/spark-submit --class com.crowdstar.etl.ParseAndClean --master spark://ec2-54-213-73-150.us-west-2.compute.amazonaws.com:7077 --deploy-mode cluster --total-executor-cores 4 file:///root/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar s3://pixlogstxt/ETL/input/2015/01/28/09/ file:///root/etl-admin/vertica/VERTICA.avdl file:///root/etl-admin/vertica/extras.json file:///root/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar s3://pixlogstxt/ETL/output/ -1 How can I change it to avoid copying the jar file to ./spark/work/app- ? Ey-Chih ChowFrom: eyc...@hotmail.com To: 2dot7kel...@gmail.com CC: gen.tan...@gmail.com; user@spark.apache.org Subject: RE: no space left at worker node Date: Mon, 9 Feb 2015 10:59:00 -0800 Thanks. But, in spark-submit, I specified the jar file in the form of local:/spark-etl-0.0.1-SNAPSHOT.jar. It comes back with the following. What's wrong with this? Ey-Chih Chow === Date: Sun, 8 Feb 2015 22:27:17 -0800Sending launch command to spark://ec2-54-213-73-150.us-west-2.compute.amazonaws.com:7077Driver successfully submitted as driver-20150209185453-0010... waiting before polling master for driver state... polling master for driver stateState of driver-20150209185453-0010 is ERRORException from cluster was: java.io.IOException: No FileSystem for scheme: localjava.io.IOException: No FileSystem for scheme: localat org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1383) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:148) at org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:74) Subject: Re: no space left at worker node From: 2dot7kel...@gmail.com To: eyc...@hotmail.com CC: gen.tan...@gmail.com; user@spark.apache.org Maybe, try with local: under the heading of Advanced Dependency Management here: https://spark.apache.org/docs/1.1.0/submitting-applications.html It seems this is what you want. Hope this help. Kelvin On Sun, Feb 8, 2015 at 9:13 PM, ey-chih chow eyc...@hotmail.com wrote: Is there any way we can disable Spark copying the jar file to the corresponding directory. I have a fat jar and is already copied to worker nodes using the command copydir. Why Spark needs to save the jar to ./spark/work/appid each time a job get started? Ey-Chih Chow Date: Sun, 8 Feb 2015 20:09:32 -0800 Subject: Re: no space left at worker node From: 2dot7kel...@gmail.com To: eyc...@hotmail.com CC: gen.tan...@gmail.com; user@spark.apache.org I guess you may set the parameters below to clean the directories: spark.worker.cleanup.enabledspark.worker.cleanup.intervalspark.worker.cleanup.appDataTtl They are described here: http://spark.apache.org/docs/1.2.0/spark-standalone.html Kelvin On Sun, Feb 8, 2015 at 5:15 PM, ey-chih chow eyc...@hotmail.com wrote: I found the problem is, for each application, the Spark worker node saves the corresponding std output and std err under ./spark/work/appid, where appid is the id of the application. If I ran several applications in a row, it will out of space. In my case, the disk usage under ./spark/work/ is as follows: 1689784 ./app-20150208203033-0002/01689788 ./app-20150208203033-000240324 ./driver-20150208180505-00011691400 ./app-20150208180509-0001/01691404 ./app-20150208180509-000140316 ./driver-20150208203030-000240320 ./driver-20150208173156-1649876 ./app-20150208173200-/01649880 ./app-20150208173200-5152036. Any suggestion how to resolve it? Thanks. Ey-Chih ChowFrom: eyc...@hotmail.com To: gen.tan...@gmail.com CC: user@spark.apache.org Subject: RE: no space left at worker node Date: Sun, 8 Feb 2015 15:25:43 -0800 By this way, the input and output paths of the job are all in s3. I did not use paths of hdfs as input or output. Best regards, Ey-Chih Chow From: eyc...@hotmail.com To: gen.tan...@gmail.com CC: user@spark.apache.org Subject: RE: no space left at worker node Date: Sun, 8 Feb 2015 14:57:15 -0800 Hi Gen, Thanks. I save my logs in a file under /var/log. This is the only place to save data. Will the problem go away if I use a better machine? Best regards, Ey-Chih Chow Date: Sun, 8 Feb 2015 23:32:27 +0100 Subject: Re: no space left at worker node From: gen.tan...@gmail.com To: eyc...@hotmail.com CC: user@spark.apache.org Hi, I am sorry that I made a mistake. r3.large has only one SSD which has been mounted in /mnt. Therefore this is no /dev/sdc.In fact, the problem is that there is no space in the under / directory. So you should check whether your application write
Re: getting error when submit spark with master as yarn
Open up 'yarn-site.xml' in your hadoop configuration. You want to create configuration for yarn.nodemanager.resource.memory-mb and yarn.scheduler.maximum-allocation-mb. Have a look here for details on how they work: https://hadoop.apache.org/docs/r2.3.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-error-when-submit-spark-with-master-as-yarn-tp21542p21547.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ImportError: No module named pyspark, when running pi.py
*Command:* sudo python ./examples/src/main/python/pi.py *Error:* Traceback (most recent call last): File ./examples/src/main/python/pi.py, line 22, in module from pyspark import SparkContext ImportError: No module named pyspark
Re: ImportError: No module named pyspark, when running pi.py
I think you have to run that using $SPARK_HOME/bin/pyspark /path/to/pi.py instead of normal python pi.py On Mon, Feb 9, 2015 at 11:22 PM, Ashish Kumar ashish.ku...@innovaccer.com wrote: *Command:* sudo python ./examples/src/main/python/pi.py *Error:* Traceback (most recent call last): File ./examples/src/main/python/pi.py, line 22, in module from pyspark import SparkContext ImportError: No module named pyspark -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates
pamameter passed for AppendOnlyMap initialCapacity
Hi, all Any experts can show me what can be done to change the initialCapacity of the following ? org.apache.spark.util.collection.AppendOnlyMap Cause we had caught problems in using spark to process large data sets during sort shuffle. Does spark offer a configurable parameter for supporting modifying this ? Very thanks, fightf...@163.com
RE: no space left at worker node
Thanks. But, in spark-submit, I specified the jar file in the form of local:/spark-etl-0.0.1-SNAPSHOT.jar. It comes back with the following. What's wrong with this? Ey-Chih Chow === Date: Sun, 8 Feb 2015 22:27:17 -0800Sending launch command to spark://ec2-54-213-73-150.us-west-2.compute.amazonaws.com:7077Driver successfully submitted as driver-20150209185453-0010... waiting before polling master for driver state... polling master for driver stateState of driver-20150209185453-0010 is ERRORException from cluster was: java.io.IOException: No FileSystem for scheme: localjava.io.IOException: No FileSystem for scheme: localat org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1383) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:148) at org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:74) Subject: Re: no space left at worker node From: 2dot7kel...@gmail.com To: eyc...@hotmail.com CC: gen.tan...@gmail.com; user@spark.apache.org Maybe, try with local: under the heading of Advanced Dependency Management here: https://spark.apache.org/docs/1.1.0/submitting-applications.html It seems this is what you want. Hope this help. Kelvin On Sun, Feb 8, 2015 at 9:13 PM, ey-chih chow eyc...@hotmail.com wrote: Is there any way we can disable Spark copying the jar file to the corresponding directory. I have a fat jar and is already copied to worker nodes using the command copydir. Why Spark needs to save the jar to ./spark/work/appid each time a job get started? Ey-Chih Chow Date: Sun, 8 Feb 2015 20:09:32 -0800 Subject: Re: no space left at worker node From: 2dot7kel...@gmail.com To: eyc...@hotmail.com CC: gen.tan...@gmail.com; user@spark.apache.org I guess you may set the parameters below to clean the directories: spark.worker.cleanup.enabledspark.worker.cleanup.intervalspark.worker.cleanup.appDataTtl They are described here: http://spark.apache.org/docs/1.2.0/spark-standalone.html Kelvin On Sun, Feb 8, 2015 at 5:15 PM, ey-chih chow eyc...@hotmail.com wrote: I found the problem is, for each application, the Spark worker node saves the corresponding std output and std err under ./spark/work/appid, where appid is the id of the application. If I ran several applications in a row, it will out of space. In my case, the disk usage under ./spark/work/ is as follows: 1689784 ./app-20150208203033-0002/01689788 ./app-20150208203033-000240324 ./driver-20150208180505-00011691400 ./app-20150208180509-0001/01691404 ./app-20150208180509-000140316 ./driver-20150208203030-000240320 ./driver-20150208173156-1649876 ./app-20150208173200-/01649880 ./app-20150208173200-5152036. Any suggestion how to resolve it? Thanks. Ey-Chih ChowFrom: eyc...@hotmail.com To: gen.tan...@gmail.com CC: user@spark.apache.org Subject: RE: no space left at worker node Date: Sun, 8 Feb 2015 15:25:43 -0800 By this way, the input and output paths of the job are all in s3. I did not use paths of hdfs as input or output. Best regards, Ey-Chih Chow From: eyc...@hotmail.com To: gen.tan...@gmail.com CC: user@spark.apache.org Subject: RE: no space left at worker node Date: Sun, 8 Feb 2015 14:57:15 -0800 Hi Gen, Thanks. I save my logs in a file under /var/log. This is the only place to save data. Will the problem go away if I use a better machine? Best regards, Ey-Chih Chow Date: Sun, 8 Feb 2015 23:32:27 +0100 Subject: Re: no space left at worker node From: gen.tan...@gmail.com To: eyc...@hotmail.com CC: user@spark.apache.org Hi, I am sorry that I made a mistake. r3.large has only one SSD which has been mounted in /mnt. Therefore this is no /dev/sdc.In fact, the problem is that there is no space in the under / directory. So you should check whether your application write data under this directory(for instance, save file in file:///). If not, you can use watch du -sh to during the running time to figure out which directory is expanding. Normally, only /mnt directory which is supported by SSD is expanding significantly, because the data of hdfs is saved here. Then you can find the directory which caused no space problem and find out the specific reason. CheersGen On Sun, Feb 8, 2015 at 10:45 PM, ey-chih chow eyc...@hotmail.com wrote: Thanks Gen. How can I check if /dev/sdc is well mounted or not? In general, the problem shows up when I submit the second or third job. The first job I submit most likely will succeed. Ey-Chih Chow Date: Sun, 8 Feb 2015 18:18:03 +0100 Subject: Re: no space left at worker node From:
Re: Spark streaming app shutting down
Thanks for the info guys. For now I'm using the high level consumer i will give this one a try. As far as the queries are concerned, check pointing helps. I'm still no t sure whats the best way to gracefully stop the application in yarn cluster mode. On 5 Feb 2015 09:38, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Akhil for mentioning this Low Level Consumer ( https://github.com/dibbhatt/kafka-spark-consumer ) . Yes it has better fault tolerant mechanism than any existing Kafka consumer available . This has no data loss on receiver failure and have ability to reply or restart itself in-case of failure. You can definitely give it a try . Dibyendu On Thu, Feb 5, 2015 at 1:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault tolerance, which means it can handle the receiver/driver failures. You can also look at the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer which has a better fault tolerance mechanism for receiver failures. This low level consumer will push the offset of the message being read into zookeeper for fault tolerance. In your case i think mostly the inflight data would be lost if you arent using any of the fault tolerance mechanism. Thanks Best Regards On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Sprakans, I'm running a spark streaming app which reads data from kafka topic does some processing and then persists the results in HBase. I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8 cores each). I've enable checkpointing I am also rate limiting my kafkaReceivers so that the number of items read is not more than 10 records per sec. The kafkaReceiver I'm using is *not* ReliableKafkaReceiver. This app was running fine for ~3 days then there was an increased load on the HBase server because of some other process querying HBase tables. This led to increase in the batch processing time of the spark batches (processed 1 min batch in 10 min) which previously was finishing in 20 sec which in turn led to the shutdown of the spark application, PFA the executor logs. From the logs I'm getting below exceptions *[1]* *[2]* looks like there was some outstanding Jobs that didn't get processed or the Job couldn't find the input data. From the logs it looks seems that the shutdown hook gets invoked but it cannot process the in-flight block. I have a couple of queries on this 1) Does this mean that these jobs failed and the *in-flight data *is lost? 2) Does the Spark job *buffers kafka* input data while the Job is under processing state for 10 mins and on shutdown is that too lost? (I do not see any OOM error in the logs). 3) Can we have *explicit commits* enabled in the kafkaReceiver so that the offsets gets committed only when the RDD(s) get successfully processed? Also I'd like to know if there is a *graceful way to shutdown a spark app running on yarn*. Currently I'm killing the yarn app to stop it which leads to loss of that job's history wheras in this case the application stops and succeeds and thus preserves the logs history. *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed *[2]* java.lang.Exception: Could not compute split, block input-2-1422901498800 not found *[3]* org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63 does not have any open files. -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
sum of columns in rowMatrix and linear regression
I have a matrix X of type: res39: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@6cfff1d3 with n rows and p columns I would like to obtain an array S of size n*1 defined as the sum of the columns of X. S will then be replaced by val s2 = s map (x= sin(x)/x) Finally I would like to fit a linear regression between X and s2. How can I do that? Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sum-of-columns-in-rowMatrix-and-linear-regression-tp21563.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can spark job server be used to visualize streaming data?
Hello Everyone, I was reading this blog post: http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/ and was wondering if this approach can be taken to visualize streaming data...not just historical data? Thank you! -Suh
Re: SparkSQL 1.2 and ElasticSearch-Spark 1.4 not working together, NoSuchMethodError problems
Hi, Spark 1.2 changed the APIs a bit which is what's causing the problem with es-spark 2.1.0.Beta3. This has been addressed a while back in es-spark proper; you can get a hold of the dev build (the upcoming 2.1.Beta4) here [1]. P.S. Do note that a lot of things have happened in es-hadoop/spark space since Spark Summit '14 and I strongly recommend reading out the docs [2] Cheers, [1] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/install.html#download-dev [2] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html On 2/9/15 9:33 PM, Aris wrote: Hello Spark community and Holden, I am trying to follow Holden Karau's SparkSQL and ElasticSearch tutorial from Spark Summit 2014. I am trying to use elasticsearch-spark 2.1.0.Beta3 and SparkSQL 1.2 together. https://github.com/holdenk/elasticsearchspark /(Side Note: This very nice tutorial does NOT BUILD with sbt 0.13 and sbt-assembly 0.12.0-M1 and Spark 1.2...so many problems I just gave up. )/ In any case, I am trying to work from Holden's tutorial to see ElasticSearch and SparkSQL interoperate, with the data being a bunch of JSON documents in ElasticSearch. The problem is with the following code, at the line with *sqlCtx.esRDD(..., ...):* // this is my def main(args) inside of my test Object val Array(esResource, esNodes) = args.take(2) val conf = new SparkConf().setAppName(TestEsSpark) conf.set(es.index.auto.create, true) val sc = new SparkContext(conf) val sqlCtx = new SQLContext(sc) import sqlCtx._ val query = {query:{match:{schemaName: SuperControllerRequest.json}}} *val searched = sqlCtx.esRDD(esResource, query)* // PROBLEM HERE - println(searched.schema) I can assemble this with sbt assembly, after much work in getting SBT to work. However, at RUN TIME, I have the following output, which complains my sqlCtx.esRDD() has a NoSuchMethodError org.apache.spark.sql.catalyst.types.StructField according to ElasticSearch. This is a nightmare and I cannot get it to work, does anybody know how to make this extremely simple test work? Further below is my SBT build file that I managed to get to work, borrowing from Holden's build.sbt. // RUN time exception Exception in thread main *java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField*.init(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:27) at org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:24) at org.elasticsearch.spark.sql.package$SQLContextFunctions.esRDD(package.scala:17) *at testesspark.TestEsSpark$.main(TestEsSpark.scala:41) *// PROBLEM HERE points to above line in code at testesspark.TestEsSpark.main(TestEsSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) //build.sbt import sbtassembly.PathList name := testesspark version := 0.1.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.2.0 exclude(org.eclipse.jetty.orbit, javax.servlet) exclude(org.eclipse.jetty.orbit, javax.transaction) exclude(org.eclipse.jetty.orbit, javax.mail) exclude(org.eclipse.jetty.orbit, javax.activation) exclude(commons-beanutils, commons-beanutils-core) exclude(commons-collections, commons-collections)
Re: python api and gzip compression
Found it - used saveAsHadoopFile On Mon, Feb 9, 2015 at 9:11 AM, Kane Kim kane.ist...@gmail.com wrote: Hi, how to compress output with gzip using python api? Thanks!
Executor Lost with StorageLevel.MEMORY_AND_DISK_SER
Hi there, I’m trying to improve performance on a job that has GC troubles and takes longer to compute simply because it has to recompute failed tasks. After deferring object creation as much as possible, I’m now trying to improve memory usage with StorageLevel.MEMORY_AND_DISK_SER and a custom KryoRegistrator that registers all used classes. This works fine both in unit tests and on a local cluster (i.e. master and worker on my dev machine). On the production cluster this fails without any error message except: Job aborted due to stage failure: Task 10 in stage 2.0 failed 4 times, most recent failure: Lost task 10.3 in stage 2.0 (TID 20, xxx.compute.internal): ExecutorLostFailure (executor lost) Driver stacktrace: Is there any way to understand what’s going on? The logs don’t show anything. I’m using Spark 1.1.1. Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to create spark AMI in AWS
Hi guys, I want to launch spark cluster in AWS. And I know there is a spark_ec2.py script. I am using the AWS service in China. But I can not find the AMI in the region of China. So, I have to build one. My question is 1. Where is the bootstrap script to create the Spark AMI? Is it here( https://github.com/mesos/spark-ec2/blob/branch-1.3/create_image.sh) ? 2. What is the base image of the Spark AMI? Eg, the base image of this ( https://github.com/mesos/spark-ec2/blob/branch-1.3/ami-list/us-west-1/hvm) 3. Shall I install scala during building the AMI? Thanks. Guodong
Re: Installing a python library along with ec2 cluster
Hi, Please take a look at http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/creating-an-ami-ebs.html Cheers Gen On Mon, Feb 9, 2015 at 6:41 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi I am very new both in spark and aws stuff.. Say, I want to install pandas on ec2.. (pip install pandas) How do I create the image and the above library which would be used from pyspark. Thanks On Sun, Feb 8, 2015 at 3:03 AM, gen tang gen.tan...@gmail.com wrote: Hi, You can make a image of ec2 with all the python libraries installed and create a bash script to export python_path in the /etc/init.d/ directory. Then you can launch the cluster with this image and ec2.py Hope this can be helpful Cheers Gen On Sun, Feb 8, 2015 at 9:46 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I want to install couple of python libraries (pip install python_library) which I want to use on pyspark cluster which are developed using the ec2 scripts. Is there a way to specify these libraries when I am building those ec2 clusters? Whats the best way to install these libraries on each ec2 node? Thanks
Re: generate a random matrix with uniform distribution
Yes the example given here should have used uniformVectorRDD. Then it's correct. On Mon, Feb 9, 2015 at 9:56 AM, Luca Puggini lucapug...@gmail.com wrote: Thanks a lot! Can I ask why this code generates a uniform distribution? If dist is N(0,1) data should be N(-1, 2). Let me know. Thanks, Luca 2015-02-07 3:00 GMT+00:00 Burak Yavuz brk...@gmail.com: Hi, You can do the following: ``` import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.random._ // sc is the spark context, numPartitions is the number of partitions you want the RDD to be in val dist: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, n, k, numPartitions, seed) // make the distribution uniform between (-1, 1) val data = dist.map(_ * 2 - 1) val matrix = new RowMatrix(data, n, k) On Feb 6, 2015 11:18 AM, Donbeo lucapug...@gmail.com wrote: Hi I would like to know how can I generate a random matrix where each element come from a uniform distribution in -1, 1 . In particular I would like the matrix be a distributed row matrix with dimension n x p Is this possible with mllib? Should I use another library? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/generate-a-random-matrix-with-uniform-distribution-tp21538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
using spark in web services
Hi experts! I am trying to use spark in my restful webservices.I am using scala lift frramework for writing web services. Here is my boot class class Boot extends Bootable { def boot { Constants.loadConfiguration val sc=new SparkContext(new SparkConf().setMaster(local).setAppName(services)) // Binding Service as a Restful API LiftRules.statelessDispatchTable.append(RestfulService); // resolve the trailing slash issue LiftRules.statelessRewrite.prepend({ case RewriteRequest(ParsePath(path, _, _, true), _, _) if path.last == index = RewriteResponse(path.init) }) } } When i remove this line val sc=new SparkContext(new SparkConf().setMaster(local).setAppName(services)) then it works fine. I am starting services using command java -jar start.jar jetty.port= and get following exception ERROR net.liftweb.http.provider.HTTPProvider - Failed to Boot! Your application may not run properly java.lang.NoClassDefFoundError: org/eclipse/jetty/server/handler/ContextHandler$NoContext at org.eclipse.jetty.servlet.ServletContextHandler.newServletHandler(ServletContextHandler.java:260) at org.eclipse.jetty.servlet.ServletContextHandler.getServletHandler(ServletContextHandler.java:322) at org.eclipse.jetty.servlet.ServletContextHandler.relinkHandlers(ServletContextHandler.java:198) at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:157) at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:135) at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129) at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:99) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:96) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:87) at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:67) at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:60) at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:60) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:60) at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:50) at org.apache.spark.ui.SparkUI.init(SparkUI.scala:63) Any suggestion please? Am I using right command to run this ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-spark-in-web-services-tp21550.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL group by on Parquet table slower when table cached
Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: Will Spark serialize an entire Object or just the method referred in an object?
`func1` and `func2` never get serialized. They must exist on the other end in the form of a class loaded by the JVM. What gets serialized is an instance of a particular closure (the argument to your map function). That's a separate class. The instance of that class that is serialized contains references to all other instances it needs to execute its apply method (or run or whatever is the correct method name). In this case, nothing is needed, since all it does is pass its argument in a call to a static method (Util.func1). Hope that helps, these things can be really confusing. You can play with javap -c to disassemble the class files to understand better how it all happens under the hood. On Mon, Feb 9, 2015 at 1:56 PM, Yitong Zhou timyit...@gmail.com wrote: If we define an Utils object: object Utils { def func1 = {..} def func2 = {..} } And then in a RDD we refer to one of the function: rdd.map{r = Utils.func1(r)} Will Utils.func2 also get serialized or not? Thanks, Yitong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Will-Spark-serialize-an-entire-Object-or-just-the-method-referred-in-an-object-tp21566.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL group by on Parquet table slower when table cached
You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Will Spark serialize an entire Object or just the method referred in an object?
If we define an Utils object: object Utils { def func1 = {..} def func2 = {..} } And then in a RDD we refer to one of the function: rdd.map{r = Utils.func1(r)} Will Utils.func2 also get serialized or not? Thanks, Yitong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Will-Spark-serialize-an-entire-Object-or-just-the-method-referred-in-an-object-tp21566.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL group by on Parquet table slower when table cached
Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: SparkSQL DateTime
The standard way to add timestamps is java.sql.Timestamp. On Mon, Feb 9, 2015 at 3:23 PM, jay vyas jayunit100.apa...@gmail.com wrote: Hi spark ! We are working on the bigpetstore-spark implementation in apache bigtop, and want to implement idiomatic date/time usage for SparkSQL. It appears that org.joda.time.DateTime isnt in SparkSQL's rolodex of reflection types. I'd rather not force an artificial dependency on hive dates just for dealing with time stamps. Whats the simplest and cleanest way to map non-spark time values into SparkSQL friendly time values? - One option could be a custom SparkSQL type, i guess? - Any plan to have native spark sql support for Joda Time or (yikes) java.util.Calendar ? -- jay vyas
Check if spark was built with hive
Is there an easy way to check if a spark binary release was built with Hive support? Are any of the prebuilt binaries on the spark website built with hive support? Thanks,Ashic.
Re: SQL group by on Parquet table slower when table cached
You could add a new ColumnType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala . PRs welcome :) On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, As a test, I have same data loaded as another parquet - except with the 2 decimal(14,4) replaced by double. With this, the on disk size is ~345MB, the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the time of uncached query. Would it be possible for Spark to store in-memory decimal in some form of long with decoration ? For the immediate future, is there any hook that we can use to provide custom caching / processing for the decimal type in RDD so other semantic does not changes ? Thanks, On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: Check if spark was built with hive
https://github.com/apache/spark/blob/master/dev/create-release/create-release.sh#L217 Yes all releases are built with -Phive except the 'without-hive' build. On Mon, Feb 9, 2015 at 10:41 PM, Ashic Mahtab as...@live.com wrote: Is there an easy way to check if a spark binary release was built with Hive support? Are any of the prebuilt binaries on the spark website built with hive support? Thanks, Ashic. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Check if spark was built with hive
Awesome...thanks Sean. From: so...@cloudera.com Date: Mon, 9 Feb 2015 22:43:45 + Subject: Re: Check if spark was built with hive To: as...@live.com CC: user@spark.apache.org https://github.com/apache/spark/blob/master/dev/create-release/create-release.sh#L217 Yes all releases are built with -Phive except the 'without-hive' build. On Mon, Feb 9, 2015 at 10:41 PM, Ashic Mahtab as...@live.com wrote: Is there an easy way to check if a spark binary release was built with Hive support? Are any of the prebuilt binaries on the spark website built with hive support? Thanks, Ashic. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL group by on Parquet table slower when table cached
Hi Michael, As a test, I have same data loaded as another parquet - except with the 2 decimal(14,4) replaced by double. With this, the on disk size is ~345MB, the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the time of uncached query. Would it be possible for Spark to store in-memory decimal in some form of long with decoration ? For the immediate future, is there any hook that we can use to provide custom caching / processing for the decimal type in RDD so other semantic does not changes ? Thanks, On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
SparkSQL DateTime
Hi spark ! We are working on the bigpetstore-spark implementation in apache bigtop, and want to implement idiomatic date/time usage for SparkSQL. It appears that org.joda.time.DateTime isnt in SparkSQL's rolodex of reflection types. I'd rather not force an artificial dependency on hive dates just for dealing with time stamps. Whats the simplest and cleanest way to map non-spark time values into SparkSQL friendly time values? - One option could be a custom SparkSQL type, i guess? - Any plan to have native spark sql support for Joda Time or (yikes) java.util.Calendar ? -- jay vyas
textFile partitions
Hi folks, puzzled by something pretty simple: I have a standalone cluster with default parallelism of 2, spark-shell running with 2 cores sc.textFile(README.md).partitions.size returns 2 (this makes sense) sc.textFile(README.md).coalesce(100,true).partitions.size returns 100, also makes sense but sc.textFile(README.md,100).partitions.size gives 102 --I was expecting this to be equivalent to last statement (i.e.result in 100 partitions) I'd appreciate if someone can enlighten me as to why I end up with 102 This is on Spark 1.2 thanks
Re: word2vec more distributed
The C implementation of Word2Vec updates the model using multi-threads without locking. It is hard to implement it in a distributed way. In the MLlib implementation, each work holds the entire model in memory and output the part of model that gets updated. The driver still need to collect and aggregate the model updates. So not only the driver but also all workers should have enough memory to hold the full model. You can try to reduce the vector size and set a higher min frequency to make the model smaller. If there are good ideas about how to improve the current implementation, please create a JIRA. -Xiangrui On Thu, Feb 5, 2015 at 1:49 PM, Alex Minnaar aminn...@verticalscope.com wrote: I was wondering if there was any chance of getting a more distributed word2vec implementation. I seem to be running out of memory from big local data structures such as val syn1Global = new Array[Float](vocabSize * vectorSize) Is there anyway chance of getting a version where these are all put in RDDs? Thanks, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org