Subscription request
Hi, Kindly subscribe me to the user group. Regards, Sathyanarayanan
Re: How to save ReceiverInputDStream to Hadoop using saveAsNewAPIHadoopFile
Hi Buntu, You could something similar to the following: val receiver_stream = new ReceiverInputDStream(ssc) { override def getReceiver(): Receiver[Nothing] = ??? //Whatever }.map((x : String) = (null, x)) val config = new Configuration() config.set(mongo.output.uri, mongodb://akhld:27017/sigmoid.output) receiver_stream.foreachRDD(rdd = { val pair_rdd = new PairRDDFunctions[Null, String](rdd) // make sure your rdd contains a key, value *pair_rdd.saveAsNewAPIHadoopFile(/home/akhld/sigmoid/beta/, classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)* }) Thanks Best Regards On Tue, Oct 21, 2014 at 11:59 PM, Buntu Dev buntu...@gmail.com wrote: Thanks Akhil, I tried this but running into similar error: ~~ val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) stream.map(message = { (null, message) }).saveAsNewAPIHadoopFile (destination, classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf) Error: value saveAsNewAPIHadoopFile is not a member of org.apache.spark.rdd.RDD[(Null, String)] How do I go about converting to PairRDDFunctions? On Fri, Oct 10, 2014 at 12:01 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can convert this ReceiverInputDStream http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream into PairRDDFuctions http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions and call the saveAsNewAPIHadoopFile. Thanks Best Regards On Fri, Oct 10, 2014 at 11:28 AM, Buntu Dev buntu...@gmail.com wrote: Basically I'm attempting to convert a JSON stream to Parquet and I get this error without the .values or .map(_._2) : value saveAsNewAPIHadoopFile is not a member of org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, String)] On Thu, Oct 9, 2014 at 10:15 PM, Sean Owen so...@cloudera.com wrote: Your RDD does not contain pairs, since you .map(_._2) (BTW that can just be .values). Hadoop files means SequenceFiles and those store key-value pairs. That's why the method only appears for RDD[(K,V)]. On Fri, Oct 10, 2014 at 3:50 AM, Buntu Dev buntu...@gmail.com wrote: Thanks Sean, but I'm importing org.apache.spark.streaming.StreamingContext._ Here are the spark imports: import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) stream.saveAsNewAPIHadoopFile (destination, classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf) Anything else I might be missing?
Re: Spark - HiveContext - Unstructured Json
For me inference is not an issue as compared to persistence. Imagine a Streaming application where the input is JSON whose format can vary from row to row and whose format I cannot pre-determine. I can use `sqlContext.jsonRDD` , but once I have the `SchemaRDD`, there is no way for me to update the ddl of the Hive table to add the extra columns that I may have encountered in a JSON row. -- Hari On Tue, Oct 21, 2014 at 6:11 PM, Cheng Lian lian.cs@gmail.com wrote: You can resort to SQLContext.jsonFile(path: String, samplingRate: Double) and set samplingRate to 1.0, so that all the columns can be inferred. You can also use SQLContext.applySchema to specify your own schema (which is a StructType). On 10/22/14 5:56 AM, Harivardan Jayaraman wrote: Hi, I have unstructured JSON as my input which may have extra columns row to row. I want to store these json rows using HiveContext so that it can be accessed from the JDBC Thrift Server. I notice there are primarily only two methods available on the SchemaRDD for data - saveAsTable and insertInto. One defines the schema while the other can be used to insert in to the table, but there is no way to Alter the table and add columns to it. How do I do this? One option that I thought of is to write native CREATE TABLE... and ALTER TABLE.. statements but just does not seem feasible because at every step, I will need to query Hive to determine what is the current schema and make a decision whether I should add columns to it or not. Any thoughts? Has anyone been able to do this?
Re: rdd.checkpoint() : driver-side lineage??
Hi, I am no expert but my best guess is that its a 'closure' problem.Spark map reduce internally does a closure of all the variables outside its scope which are being used for the map operation.It does a serialization check for the map task . Since class scala.util.Random is not serializable it throws that exception. Best way to avoid this would be do create a wrapper object which is serializable and create a random instance inside it. Then you can use that instance which is part of that serializable object without an issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-checkpoint-driver-side-lineage-tp9049p16997.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SchemaRDD Convert
Hi, ALL I have a RDD of case class T and T contains several primitive types and a Map. How can I convert this to a SchemaRDD? Best Regards, Kevin.
Python vs Scala performance
Hi there, we have a small Spark cluster running and are processing around 40 GB of Gzip-compressed JSON data per day. I have written a couple of word count-like Scala jobs that essentially pull in all the data, do some joins, group bys and aggregations. A job takes around 40 minutes to complete. Now one of the data scientists on the team wants to do write some jobs using Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the API-side, everything looks more or less identical. However his jobs take between 5-8 hours to complete! We can also see that the execution plan is quite different, I’m seeing writes to the output much later than in Scala. Is Python I/O really that slow? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Python vs Scala performance
I'm no expert, but looked into how the python bits work a while back (was trying to assess what it would take to add F# support). It seems python hosts a jvm inside of it, and talks to scala spark in that jvm. The python server bit translates the python calls to those in the jvm. The python spark context is like an adapter to the jvm spark context. If you're seeing performance discrepancies, this might be the reason why. If the code can be organised to require fewer interactions with the adapter, that may improve things. Take this with a pinch of salt...I might be way off on this :) Cheers, Ashic. From: mps@gmail.com Subject: Python vs Scala performance Date: Wed, 22 Oct 2014 12:00:41 +0200 To: user@spark.apache.org Hi there, we have a small Spark cluster running and are processing around 40 GB of Gzip-compressed JSON data per day. I have written a couple of word count-like Scala jobs that essentially pull in all the data, do some joins, group bys and aggregations. A job takes around 40 minutes to complete. Now one of the data scientists on the team wants to do write some jobs using Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the API-side, everything looks more or less identical. However his jobs take between 5-8 hours to complete! We can also see that the execution plan is quite different, I’m seeing writes to the output much later than in Scala. Is Python I/O really that slow? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python vs Scala performance
What version of Spark are you running? Some recent changes https://spark.apache.org/releases/spark-release-1-1-0.html to how PySpark works relative to Scala Spark may explain things. PySpark should not be that much slower, not by a stretch. On Wed, Oct 22, 2014 at 6:11 AM, Ashic Mahtab as...@live.com wrote: I'm no expert, but looked into how the python bits work a while back (was trying to assess what it would take to add F# support). It seems python hosts a jvm inside of it, and talks to scala spark in that jvm. The python server bit translates the python calls to those in the jvm. The python spark context is like an adapter to the jvm spark context. If you're seeing performance discrepancies, this might be the reason why. If the code can be organised to require fewer interactions with the adapter, that may improve things. Take this with a pinch of salt...I might be way off on this :) Cheers, Ashic. From: mps@gmail.com Subject: Python vs Scala performance Date: Wed, 22 Oct 2014 12:00:41 +0200 To: user@spark.apache.org Hi there, we have a small Spark cluster running and are processing around 40 GB of Gzip-compressed JSON data per day. I have written a couple of word count-like Scala jobs that essentially pull in all the data, do some joins, group bys and aggregations. A job takes around 40 minutes to complete. Now one of the data scientists on the team wants to do write some jobs using Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the API-side, everything looks more or less identical. However his jobs take between 5-8 hours to complete! We can also see that the execution plan is quite different, I’m seeing writes to the output much later than in Scala. Is Python I/O really that slow? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python vs Scala performance
We’re using 1.1.0. Yes I expected Scala to be maybe twice as fast, but not that... On 22.10.2014, at 13:02, Nicholas Chammas nicholas.cham...@gmail.com wrote: What version of Spark are you running? Some recent changes to how PySpark works relative to Scala Spark may explain things. PySpark should not be that much slower, not by a stretch. On Wed, Oct 22, 2014 at 6:11 AM, Ashic Mahtab as...@live.com wrote: I'm no expert, but looked into how the python bits work a while back (was trying to assess what it would take to add F# support). It seems python hosts a jvm inside of it, and talks to scala spark in that jvm. The python server bit translates the python calls to those in the jvm. The python spark context is like an adapter to the jvm spark context. If you're seeing performance discrepancies, this might be the reason why. If the code can be organised to require fewer interactions with the adapter, that may improve things. Take this with a pinch of salt...I might be way off on this :) Cheers, Ashic. From: mps@gmail.com Subject: Python vs Scala performance Date: Wed, 22 Oct 2014 12:00:41 +0200 To: user@spark.apache.org Hi there, we have a small Spark cluster running and are processing around 40 GB of Gzip-compressed JSON data per day. I have written a couple of word count-like Scala jobs that essentially pull in all the data, do some joins, group bys and aggregations. A job takes around 40 minutes to complete. Now one of the data scientists on the team wants to do write some jobs using Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the API-side, everything looks more or less identical. However his jobs take between 5-8 hours to complete! We can also see that the execution plan is quite different, I’m seeing writes to the output much later than in Scala. Is Python I/O really that slow? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: SchemaRDD Convert
You needn't do anything, the implicit conversion should do this for you. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L103 https://github.com/apache/spark/blob/2ac40da3f9fa6d45a59bb45b41606f1931ac5e81/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L35 Just be sure you import the right implicit conversion function. From: Dai, Kevin [mailto:yun...@ebay.com] Sent: Wednesday, October 22, 2014 4:17 PM To: user@spark.apache.org Subject: SchemaRDD Convert Hi, ALL I have a RDD of case class T and T contains several primitive types and a Map. How can I convert this to a SchemaRDD? Best Regards, Kevin.
Re: Transforming the Dstream vs transforming each RDDs in the Dstream.
Thanks Matt, Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. My question still remains: Is it better to foreachRDD early in the process or do as much Dstream transformations before going into the foreachRDD call? Maybe this will require some empirical testing specific to each implementation? -kr, Gerard. On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell matt.narr...@gmail.com wrote: http://spark.apache.org/docs/latest/streaming-programming-guide.html foreachRDD is executed on the driver…. mn On Oct 20, 2014, at 3:07 AM, Gerard Maas gerard.m...@gmail.com wrote: Pinging TD -- I'm sure you know :-) -kr, Gerard. On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, We have been implementing several Spark Streaming jobs that are basically processing data and inserting it into Cassandra, sorting it among different keyspaces. We've been following the pattern: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) I've been wondering whether there would be a performance difference in transforming the dstream instead of transforming the RDD within the dstream with regards to how the transformations get scheduled. Instead of the RDD-centric computation, I could transform the dstream until the last step, where I need an rdd to store. For example, the previous transformation could be written as: val recordStream = dstream.map(elem = record(elem)) targets.foreach{target = recordStream.filter(record = isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} Would be a difference in execution and/or performance? What would be the preferred way to do this? Bonus question: Is there a better (more performant) way to sort the data in different buckets instead of filtering the data collection times the #buckets? thanks, Gerard.
Re: Python vs Scala performance
Total guess without knowing anything about your code: Do either of these two notes from the 1.1.0 release notes http://spark.apache.org/releases/spark-release-1-1-0.html affect things at all? - PySpark now performs external spilling during aggregations. Old behavior can be restored by setting spark.shuffle.spill to false. - PySpark uses a new heuristic for determining the parallelism of shuffle operations. Old behavior can be restored by setting spark.default.parallelism to the number of cores in the cluster. Nick On Wed, Oct 22, 2014 at 7:29 AM, Marius Soutier mps@gmail.com wrote: We’re using 1.1.0. Yes I expected Scala to be maybe twice as fast, but not that... On 22.10.2014, at 13:02, Nicholas Chammas nicholas.cham...@gmail.com wrote: What version of Spark are you running? Some recent changes https://spark.apache.org/releases/spark-release-1-1-0.html to how PySpark works relative to Scala Spark may explain things. PySpark should not be that much slower, not by a stretch. On Wed, Oct 22, 2014 at 6:11 AM, Ashic Mahtab as...@live.com wrote: I'm no expert, but looked into how the python bits work a while back (was trying to assess what it would take to add F# support). It seems python hosts a jvm inside of it, and talks to scala spark in that jvm. The python server bit translates the python calls to those in the jvm. The python spark context is like an adapter to the jvm spark context. If you're seeing performance discrepancies, this might be the reason why. If the code can be organised to require fewer interactions with the adapter, that may improve things. Take this with a pinch of salt...I might be way off on this :) Cheers, Ashic. From: mps@gmail.com Subject: Python vs Scala performance Date: Wed, 22 Oct 2014 12:00:41 +0200 To: user@spark.apache.org Hi there, we have a small Spark cluster running and are processing around 40 GB of Gzip-compressed JSON data per day. I have written a couple of word count-like Scala jobs that essentially pull in all the data, do some joins, group bys and aggregations. A job takes around 40 minutes to complete. Now one of the data scientists on the team wants to do write some jobs using Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the API-side, everything looks more or less identical. However his jobs take between 5-8 hours to complete! We can also see that the execution plan is quite different, I’m seeing writes to the output much later than in Scala. Is Python I/O really that slow? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark sql query optimization , and decision tree building
The “output” variable is actually a SchemaRDD, it provides lots of DSL API, see http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD 1) How to save result values of a query into a list ? [CH:] val list: Array[Row] = output.collect, however get 1M records into an array seems not a good idea. 2) How to calculate variance of a column .Is there any efficient way? [CH:] Not sure what’s that mean, but you can try output.select(‘colname).groupby ? 3) i will be running multiple queries on same data .Does spark has any way to optimize it ? [CH:] val cachedRdd = output.cache(), and do whatever you need to do based on cachedRDD 4) how to save the output as key value pairs in a text file ? [CH:] cachedRdd.generate(xx,xx,xx).saveAsTextFile(xx) 5) is there any way i can build decision kd tree using machine libraries of spark ? [CH:] Sorry, I am not sure about how kd tree used in mllib. but keep in mind SchemaRDD is just a normal RDD. Cheng Hao From: sanath kumar [mailto:sanath1...@gmail.com] Sent: Wednesday, October 22, 2014 12:58 PM To: user@spark.apache.org Subject: spark sql query optimization , and decision tree building Hi all , I have a large data in text files (1,000,000 lines) .Each line has 128 columns . Here each line is a feature and each column is a dimension. I have converted the txt files in json format and able to run sql queries on json files using spark. Now i am trying to build a k dimenstion decision tree (kd tree) with this large data . My steps : 1) calculate variance of each column pick the column with maximum variance and make it as key of first node , and mean of the column as the value of the node. 2) based on the first node value split the data into 2 parts an repeat the process until you reach a point. My sample code : import sqlContext._ val people = sqlContext.jsonFile(siftoutput/) people.printSchema() people.registerTempTable(people) val output = sqlContext.sql(SELECT * From people) My Questions : 1) How to save result values of a query into a list ? 2) How to calculate variance of a column .Is there any efficient way? 3) i will be running multiple queries on same data .Does spark has any way to optimize it ? 4) how to save the output as key value pairs in a text file ? 5) is there any way i can build decision kd tree using machine libraries of spark ? please help Thanks, Sanath
Re: Spark Hive Snappy Error
Hi, Yes, I can always reproduce the issue: about you workload, Spark configuration, JDK version and OS version? I ran SparkPI 1000 java -version java version 1.7.0_67 Java(TM) SE Runtime Environment (build 1.7.0_67-b01) Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode) cat /etc/centos-release CentOS release 6.5 (Final) My Spark’s hive-site.xml with following: property namehive.exec.compress.output/name valuetrue/value /property property namemapred.output.compression.codec/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapred.output.compression.type/name valueBLOCK/value /property e.g. MASTER=spark://m1:7077,m2:7077 ./bin/run-example SparkPi 1000 2014-10-22 20:23:17,033 ERROR [sparkDriver-akka.actor.default-dispatcher-18] actor.ActorSystemImpl (Slf4jLogger.scala:apply$mcV$sp(66)) - Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem [sparkDriver] java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2014-10-22 20:23:17,036 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Failed to run reduce at SparkPi.scala:35 Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at
Re: Transforming the Dstream vs transforming each RDDs in the Dstream.
PS: Just to clarify my statement: Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. With feared RDD operations on the driver I meant to contrast an rdd action like rdd.collect that would pull all rdd data to the driver, with dstream.foreachRDD(rdd = rdd.op) for which documentation says 'it runs on the driver' yet, all that it looks to be running on the driver is the scheduling of 'op' on that rdd, just like it happens for all rdd other operations (thanks to Sean for the clarification) So, not to move focus away from the original question: In Spark Streaming, would it be better to do foreachRDD early in a pipeline or instead do as much Dstream transformations before going into the foreachRDD call? Between these two pieces of code, from a performance perspective, what would be preferred and why: - Early foreachRDD: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) - As most dstream transformations as possible before foreachRDD: val recordStream = dstream.map(elem = record(elem)) targets.foreach{target = recordStream.filter(record = isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} ? kr, Gerard. On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Matt, Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. My question still remains: Is it better to foreachRDD early in the process or do as much Dstream transformations before going into the foreachRDD call? Maybe this will require some empirical testing specific to each implementation? -kr, Gerard. On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell matt.narr...@gmail.com wrote: http://spark.apache.org/docs/latest/streaming-programming-guide.html foreachRDD is executed on the driver…. mn On Oct 20, 2014, at 3:07 AM, Gerard Maas gerard.m...@gmail.com wrote: Pinging TD -- I'm sure you know :-) -kr, Gerard. On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, We have been implementing several Spark Streaming jobs that are basically processing data and inserting it into Cassandra, sorting it among different keyspaces. We've been following the pattern: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) I've been wondering whether there would be a performance difference in transforming the dstream instead of transforming the RDD within the dstream with regards to how the transformations get scheduled. Instead of the RDD-centric computation, I could transform the dstream until the last step, where I need an rdd to store. For example, the previous transformation could be written as: val recordStream = dstream.map(elem = record(elem)) targets.foreach{target = recordStream.filter(record = isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} Would be a difference in execution and/or performance? What would be the preferred way to do this? Bonus question: Is there a better (more performant) way to sort the data in different buckets instead of filtering the data collection times the #buckets? thanks, Gerard.
RE: Spark Hive Snappy Error
Thanks a lot, I will try to reproduce this in my local settings and dig into the details, thanks for your information. BR Jerry From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] Sent: Wednesday, October 22, 2014 8:35 PM To: Shao, Saisai Cc: arthur.hk.c...@gmail.com; user Subject: Re: Spark Hive Snappy Error Hi, Yes, I can always reproduce the issue: about you workload, Spark configuration, JDK version and OS version? I ran SparkPI 1000 java -version java version 1.7.0_67 Java(TM) SE Runtime Environment (build 1.7.0_67-b01) Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode) cat /etc/centos-release CentOS release 6.5 (Final) My Spark’s hive-site.xml with following: property namehive.exec.compress.output/name valuetrue/value /property property namemapred.output.compression.codec/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapred.output.compression.type/name valueBLOCK/value /property e.g. MASTER=spark://m1:7077,m2:7077 ./bin/run-example SparkPi 1000 2014-10-22 20:23:17,033 ERROR [sparkDriver-akka.actor.default-dispatcher-18] actor.ActorSystemImpl (Slf4jLogger.scala:apply$mcV$sp(66)) - Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem [sparkDriver] java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2014-10-22 20:23:17,036 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Failed to run reduce at SparkPi.scala:35 Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240) at
Re: Spark Hive Snappy Error
Hi, FYI, I use snappy-java-1.0.4.1.jar Regards Arthur On 22 Oct, 2014, at 8:59 pm, Shao, Saisai saisai.s...@intel.com wrote: Thanks a lot, I will try to reproduce this in my local settings and dig into the details, thanks for your information. BR Jerry From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] Sent: Wednesday, October 22, 2014 8:35 PM To: Shao, Saisai Cc: arthur.hk.c...@gmail.com; user Subject: Re: Spark Hive Snappy Error Hi, Yes, I can always reproduce the issue: about you workload, Spark configuration, JDK version and OS version? I ran SparkPI 1000 java -version java version 1.7.0_67 Java(TM) SE Runtime Environment (build 1.7.0_67-b01) Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode) cat /etc/centos-release CentOS release 6.5 (Final) My Spark’s hive-site.xml with following: property namehive.exec.compress.output/name valuetrue/value /property property namemapred.output.compression.codec/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapred.output.compression.type/name valueBLOCK/value /property e.g. MASTER=spark://m1:7077,m2:7077 ./bin/run-example SparkPi 1000 2014-10-22 20:23:17,033 ERROR [sparkDriver-akka.actor.default-dispatcher-18] actor.ActorSystemImpl (Slf4jLogger.scala:apply$mcV$sp(66)) - Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem [sparkDriver] java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2014-10-22 20:23:17,036 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Failed to run reduce at SparkPi.scala:35 Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
Re: Python vs Scala performance
Interesting thread Marius, Btw, I'm curious about your cluster size. How small it is in terms of ram and cores. Arian 2014-10-22 13:17 GMT+01:00 Nicholas Chammas nicholas.cham...@gmail.com: Total guess without knowing anything about your code: Do either of these two notes from the 1.1.0 release notes http://spark.apache.org/releases/spark-release-1-1-0.html affect things at all? - PySpark now performs external spilling during aggregations. Old behavior can be restored by setting spark.shuffle.spill to false. - PySpark uses a new heuristic for determining the parallelism of shuffle operations. Old behavior can be restored by setting spark.default.parallelism to the number of cores in the cluster. Nick On Wed, Oct 22, 2014 at 7:29 AM, Marius Soutier mps@gmail.com wrote: We’re using 1.1.0. Yes I expected Scala to be maybe twice as fast, but not that... On 22.10.2014, at 13:02, Nicholas Chammas nicholas.cham...@gmail.com wrote: What version of Spark are you running? Some recent changes https://spark.apache.org/releases/spark-release-1-1-0.html to how PySpark works relative to Scala Spark may explain things. PySpark should not be that much slower, not by a stretch. On Wed, Oct 22, 2014 at 6:11 AM, Ashic Mahtab as...@live.com wrote: I'm no expert, but looked into how the python bits work a while back (was trying to assess what it would take to add F# support). It seems python hosts a jvm inside of it, and talks to scala spark in that jvm. The python server bit translates the python calls to those in the jvm. The python spark context is like an adapter to the jvm spark context. If you're seeing performance discrepancies, this might be the reason why. If the code can be organised to require fewer interactions with the adapter, that may improve things. Take this with a pinch of salt...I might be way off on this :) Cheers, Ashic. From: mps@gmail.com Subject: Python vs Scala performance Date: Wed, 22 Oct 2014 12:00:41 +0200 To: user@spark.apache.org Hi there, we have a small Spark cluster running and are processing around 40 GB of Gzip-compressed JSON data per day. I have written a couple of word count-like Scala jobs that essentially pull in all the data, do some joins, group bys and aggregations. A job takes around 40 minutes to complete. Now one of the data scientists on the team wants to do write some jobs using Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the API-side, everything looks more or less identical. However his jobs take between 5-8 hours to complete! We can also see that the execution plan is quite different, I’m seeing writes to the output much later than in Scala. Is Python I/O really that slow? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
confused about memory usage in spark
I have a PairRDD of type String,String which I persist to S3 (using the following code). JavaPairRDDText, Text aRDDWritable = aRDD.mapToPair(new ConvertToWritableTypes());aRDDWritable.saveAsHadoopFile(outputFile, Text.class, Text.class, SequenceFileOutputFormat.class); class ConvertToWritableTypes implements PairFunctionTuple2String, String, Text, Text { public Tuple2Text, Text call(Tuple2String, String record) { return new Tuple2(new Text(record._1), new Text(record._2)); } } When I look at the S3 reported size for say one of the parts (part-0) it indicates the size is 156MB. I then bring up a spark-shell and load this part-0 and cache it. scala val keyPair = sc.sequenceFile[String,String](s3n://somebucket/part-0).cache() After execution an action for the above RDD to force the cache, I look at the storage (using the Application UI) and it show that I'm using 297MB for this RDD (when it was only 156MB in S3). I get that there could be some differences between the serialized storage format and what is then used in memory, but I'm curious as to whether I'm missing something and/or should be doing things differently. Thanks. Darin.
Re: confused about memory usage in spark
You can enable rdd compression (*spark.rdd.compress*) also you can use MEMORY_ONLY_SER ( *sc.sequenceFile[String,String](s3n://somebucket/part-0).persist(StorageLevel.MEMORY_ONLY_SER* *)* ) to reduce the rdd size in memory. Thanks Best Regards On Wed, Oct 22, 2014 at 7:51 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: I have a PairRDD of type String,String which I persist to S3 (using the following code). JavaPairRDDText, Text aRDDWritable = aRDD.mapToPair(new ConvertToWritableTypes()); aRDDWritable.saveAsHadoopFile(outputFile, Text.class, Text.class, SequenceFileOutputFormat.class); class ConvertToWritableTypes implements PairFunctionTuple2String, String, Text, Text { public Tuple2Text, Text call(Tuple2String, String record) { return new Tuple2(new Text(record._1), new Text(record._2)); } } When I look at the S3 reported size for say one of the parts (part-0) it indicates the size is 156MB. I then bring up a spark-shell and load this part-0 and cache it. scala val keyPair = sc.sequenceFile[String,String](s3n://somebucket/part-0).cache() After execution an action for the above RDD to force the cache, I look at the storage (using the Application UI) and it show that I'm using 297MB for this RDD (when it was only 156MB in S3). I get that there could be some differences between the serialized storage format and what is then used in memory, but I'm curious as to whether I'm missing something and/or should be doing things differently. Thanks. Darin.
[no subject]
unsubscribe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: confused about memory usage in spark
One thing to remember is that Strings are composed of chars in Java, which take 2 bytes each. The encoding of the text on disk on S3 is probably something like UTF-8, which takes much closer to 1 byte per character for English text. This might explain the factor of ~2 difference. On Wed, Oct 22, 2014 at 3:21 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: I have a PairRDD of type String,String which I persist to S3 (using the following code). JavaPairRDDText, Text aRDDWritable = aRDD.mapToPair(new ConvertToWritableTypes()); aRDDWritable.saveAsHadoopFile(outputFile, Text.class, Text.class, SequenceFileOutputFormat.class); class ConvertToWritableTypes implements PairFunctionTuple2String, String, Text, Text { public Tuple2Text, Text call(Tuple2String, String record) { return new Tuple2(new Text(record._1), new Text(record._2)); } } When I look at the S3 reported size for say one of the parts (part-0) it indicates the size is 156MB. I then bring up a spark-shell and load this part-0 and cache it. scala val keyPair = sc.sequenceFile[String,String](s3n://somebucket/part-0).cache() After execution an action for the above RDD to force the cache, I look at the storage (using the Application UI) and it show that I'm using 297MB for this RDD (when it was only 156MB in S3). I get that there could be some differences between the serialized storage format and what is then used in memory, but I'm curious as to whether I'm missing something and/or should be doing things differently. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re:
See first section of http://spark.apache.org/community On Wed, Oct 22, 2014 at 7:42 AM, Margusja mar...@roo.ee wrote: unsubscribe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python vs Scala performance
Didn’t seem to help: conf = SparkConf().set(spark.shuffle.spill, false).set(spark.default.parallelism, 12) sc = SparkContext(appName=’app_name', conf = conf) but still taking as much time On 22.10.2014, at 14:17, Nicholas Chammas nicholas.cham...@gmail.com wrote: Total guess without knowing anything about your code: Do either of these two notes from the 1.1.0 release notes affect things at all? PySpark now performs external spilling during aggregations. Old behavior can be restored by setting spark.shuffle.spill to false. PySpark uses a new heuristic for determining the parallelism of shuffle operations. Old behavior can be restored by setting spark.default.parallelism to the number of cores in the cluster. Nick
Rdd of Rdds
Hello, I would like to parallelize my work on multiple RDDs I have. I wanted to know if spark can support a foreach on an RDD of RDDs. Here's a java example: public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName(testapp); sparkConf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(sparkConf); ListString list = Arrays.asList(new String[] {1, 2, 3}); JavaRDDString rdd = sc.parallelize(list); ListString list1 = Arrays.asList(new String[] {a, b, c}); JavaRDDString rdd1 = sc.parallelize(list1); ListJavaRDDString rddList = new ArrayListJavaRDDString(); rddList.add(rdd); rddList.add(rdd1); JavaRDDJavaRDDString rddOfRdds = sc.parallelize(rddList); System.out.println(rddOfRdds.count()); rddOfRdds.foreach(new VoidFunctionJavaRDDString() { @Override public void call(JavaRDDString t) throws Exception { System.out.println(t.count()); } }); } From this code I'm getting a NullPointerException on the internal count method: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 1 times, most recent failure: Exception failure in TID 1 on host localhost: java.lang.NullPointerException org.apache.spark.rdd.RDD.count(RDD.scala:861) org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:365) org.apache.spark.api.java.JavaRDD.count(JavaRDD.scala:29) Help will be appreciated. Thanks, Tomer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Rdd of Rdds
No, there's no such thing as an RDD of RDDs in Spark. Here though, why not just operate on an RDD of Lists? or a List of RDDs? Usually one of these two is the right approach whenever you feel inclined to operate on an RDD of RDDs. On Wed, Oct 22, 2014 at 3:58 PM, Tomer Benyamini tomer@gmail.com wrote: Hello, I would like to parallelize my work on multiple RDDs I have. I wanted to know if spark can support a foreach on an RDD of RDDs. Here's a java example: public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName(testapp); sparkConf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(sparkConf); ListString list = Arrays.asList(new String[] {1, 2, 3}); JavaRDDString rdd = sc.parallelize(list); ListString list1 = Arrays.asList(new String[] {a, b, c}); JavaRDDString rdd1 = sc.parallelize(list1); ListJavaRDDString rddList = new ArrayListJavaRDDString(); rddList.add(rdd); rddList.add(rdd1); JavaRDDJavaRDDString rddOfRdds = sc.parallelize(rddList); System.out.println(rddOfRdds.count()); rddOfRdds.foreach(new VoidFunctionJavaRDDString() { @Override public void call(JavaRDDString t) throws Exception { System.out.println(t.count()); } }); } From this code I'm getting a NullPointerException on the internal count method: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 1 times, most recent failure: Exception failure in TID 1 on host localhost: java.lang.NullPointerException org.apache.spark.rdd.RDD.count(RDD.scala:861) org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:365) org.apache.spark.api.java.JavaRDD.count(JavaRDD.scala:29) Help will be appreciated. Thanks, Tomer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python vs Scala performance
Wild guess maybe, but do you decode the json records in Python ? it could be much slower as the default lib is quite slow. If so try ujson [1] - a C implementation that is at least an order of magnitude faster. HTH [1] https://pypi.python.org/pypi/ujson 2014-10-22 16:51 GMT+02:00 Marius Soutier mps@gmail.com: It’s an AWS cluster that is rather small at the moment, 4 worker nodes @ 28 GB RAM and 4 cores, but fast enough for the currently 40 Gigs a day. Data is on HDFS in EBS volumes. Each file is a Gzip-compress collection of JSON objects, each one between 115-120 MB to be near the HDFS block size. One core per worker is permanently used by a job that allows SQL queries over Parquet files. On 22.10.2014, at 16:18, Arian Pasquali ar...@arianpasquali.com wrote: Interesting thread Marius, Btw, I'm curious about your cluster size. How small it is in terms of ram and cores. Arian 2014-10-22 13:17 GMT+01:00 Nicholas Chammas nicholas.cham...@gmail.com: Total guess without knowing anything about your code: Do either of these two notes from the 1.1.0 release notes http://spark.apache.org/releases/spark-release-1-1-0.html affect things at all? - PySpark now performs external spilling during aggregations. Old behavior can be restored by setting spark.shuffle.spill to false. - PySpark uses a new heuristic for determining the parallelism of shuffle operations. Old behavior can be restored by setting spark.default.parallelism to the number of cores in the cluster. Nick On Wed, Oct 22, 2014 at 7:29 AM, Marius Soutier mps@gmail.com wrote: We’re using 1.1.0. Yes I expected Scala to be maybe twice as fast, but not that... On 22.10.2014, at 13:02, Nicholas Chammas nicholas.cham...@gmail.com wrote: What version of Spark are you running? Some recent changes https://spark.apache.org/releases/spark-release-1-1-0.html to how PySpark works relative to Scala Spark may explain things. PySpark should not be that much slower, not by a stretch. On Wed, Oct 22, 2014 at 6:11 AM, Ashic Mahtab as...@live.com wrote: I'm no expert, but looked into how the python bits work a while back (was trying to assess what it would take to add F# support). It seems python hosts a jvm inside of it, and talks to scala spark in that jvm. The python server bit translates the python calls to those in the jvm. The python spark context is like an adapter to the jvm spark context. If you're seeing performance discrepancies, this might be the reason why. If the code can be organised to require fewer interactions with the adapter, that may improve things. Take this with a pinch of salt...I might be way off on this :) Cheers, Ashic. From: mps@gmail.com Subject: Python vs Scala performance Date: Wed, 22 Oct 2014 12:00:41 +0200 To: user@spark.apache.org Hi there, we have a small Spark cluster running and are processing around 40 GB of Gzip-compressed JSON data per day. I have written a couple of word count-like Scala jobs that essentially pull in all the data, do some joins, group bys and aggregations. A job takes around 40 minutes to complete. Now one of the data scientists on the team wants to do write some jobs using Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the API-side, everything looks more or less identical. However his jobs take between 5-8 hours to complete! We can also see that the execution plan is quite different, I’m seeing writes to the output much later than in Scala. Is Python I/O really that slow? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark as key/value store?
Spark, in general, is good for iterating through an entire dataset again and again. All operations are expressed in terms of iteration through all the records of at least one partition. You may want to look at IndexedRDD ( https://issues.apache.org/jira/browse/SPARK-2365) that aims to improve point queries. In general though, Spark is unlikely to outperform KV stores because of the nature of scheduling a job for every operation. On Wed, Oct 22, 2014 at 7:51 AM, Hajime Takase placeofnomemor...@gmail.com wrote: Hi, Is it possible to use Spark as clustered key/value store ( say, like redis-cluster or hazelcast)?Will it out perform in write/read or other operation? My main urge is to use same RDD from several different SparkContext without saving to disk or using spark-job server,but I'm curious if someone has already tried using Spark like key/value store. Thanks, Hajime
Re: Spark Bug? job fails to run when given options on spark-submit (but starts and fails without)
Hi Michael Campbell, Are you deploying against yarn or standalone mode? In yarn try setting the shell variables SPARK_EXECUTOR_MEMORY=2G in standalone try and set SPARK_WORKER_MEMORY=2G. Cheers, Holden :) On Thu, Oct 16, 2014 at 2:22 PM, Michael Campbell michael.campb...@gmail.com wrote: TL;DR - a spark SQL job fails with an OOM (Out of heap space) error. If given --executor-memory values, it won't even start. Even (!) if the values given ARE THE SAME AS THE DEFAULT. Without --executor-memory: 14/10/16 17:14:58 INFO TaskSetManager: Serialized task 1.0:64 as 14710 bytes in 1 ms 14/10/16 17:14:58 WARN TaskSetManager: Lost TID 26 (task 1.0:25) 14/10/16 17:14:58 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:609) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) ... USING --executor-memory (WITH ANY VALUE), even 1G which is the default: Parsed arguments: master spark://redacted:7077 deployMode null executorMemory 1G ... System properties: spark.executor.memory - 1G spark.eventLog.enabled - true ... 14/10/16 17:14:23 INFO TaskSchedulerImpl: Adding task set 1.0 with 678 tasks 14/10/16 17:14:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Spark 1.0.0. Is this a bug? -- Cell : 425-233-8271
Re: Usage of spark-ec2: how to deploy a revised version of spark 1.1.0?
You can use --spark-version argument to spark-ec2 to specify a GIT hash corresponding to the version you want to checkout. If you made changes that are not in the master repository, you can use --spark-git-repo to specify the git repository to pull down spark from, which contains the specified commit hash. On Tue, Oct 21, 2014 at 3:52 PM, sameerf same...@databricks.com wrote: Hi, Can you post what the error looks like? Sameer F. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Usage-of-spark-ec2-how-to-deploy-a-revised-version-of-spark-1-1-0-tp16943p16963.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveasSequenceFile with codec and compression type
Hi gpatcham, If you want to save as a sequence file with a custom compression type you can use saveAsHadoopFile along with setting the mapred.output.compression.type on the jobconf. If you want to keep using saveAsSequenceFile, and the syntax is much nicer, you could also set that property on the SparkConf but then it would apply in general. Looking at the SequenceFileOutputFormat.java it seems the default is RECORD so if that fits for you, you can just use the default too :) Cheers, Holden On Mon, Oct 20, 2014 at 2:41 PM, gpatcham gpatc...@gmail.com wrote: Hi All, I'm trying to save RDD as sequencefile and not able to use compresiontype (BLOCK or RECORD) Can any one let me know how we can use compressiontype here is the code I'm using RDD.saveAsSequenceFile(target,Some(classOf[org.apache.hadoop.io.compress.GzipCodec])) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveasSequenceFile-with-codec-and-compression-type-tp16853.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Cell : 425-233-8271
Re: create a Row Matrix
This works for me import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.distributed.RowMatrix val v1=Vectors.dense(Array(1d,2d)) val v2=Vectors.dense(Array(3d,4d)) val rows=sc.parallelize(List(v1,v2)) val mat=new RowMatrix(rows) val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(2, computeU = true) On Wed, Oct 22, 2014 at 1:55 AM, viola viola.wiersc...@siemens.com wrote: Thanks for the quick response. However, I still only get error messages. I am able to load a .txt file with entries in it and use it in sparks, but I am not able to create a simple matrix, for instance a 2x2 row matrix [1 2 3 4] I tried variations such as val RowMatrix = Matrix(2,2,array(1,3,2,4)) but it doesn't work.. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/create-a-Row-Matrix-tp16913p16993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python vs Scala performance
In the master, you can easily profile you job, find the bottlenecks, see https://github.com/apache/spark/pull/2556 Could you try it and show the stats? Davies On Wed, Oct 22, 2014 at 7:51 AM, Marius Soutier mps@gmail.com wrote: It’s an AWS cluster that is rather small at the moment, 4 worker nodes @ 28 GB RAM and 4 cores, but fast enough for the currently 40 Gigs a day. Data is on HDFS in EBS volumes. Each file is a Gzip-compress collection of JSON objects, each one between 115-120 MB to be near the HDFS block size. One core per worker is permanently used by a job that allows SQL queries over Parquet files. On 22.10.2014, at 16:18, Arian Pasquali ar...@arianpasquali.com wrote: Interesting thread Marius, Btw, I'm curious about your cluster size. How small it is in terms of ram and cores. Arian 2014-10-22 13:17 GMT+01:00 Nicholas Chammas nicholas.cham...@gmail.com: Total guess without knowing anything about your code: Do either of these two notes from the 1.1.0 release notes affect things at all? PySpark now performs external spilling during aggregations. Old behavior can be restored by setting spark.shuffle.spill to false. PySpark uses a new heuristic for determining the parallelism of shuffle operations. Old behavior can be restored by setting spark.default.parallelism to the number of cores in the cluster. Nick On Wed, Oct 22, 2014 at 7:29 AM, Marius Soutier mps@gmail.com wrote: We’re using 1.1.0. Yes I expected Scala to be maybe twice as fast, but not that... On 22.10.2014, at 13:02, Nicholas Chammas nicholas.cham...@gmail.com wrote: What version of Spark are you running? Some recent changes to how PySpark works relative to Scala Spark may explain things. PySpark should not be that much slower, not by a stretch. On Wed, Oct 22, 2014 at 6:11 AM, Ashic Mahtab as...@live.com wrote: I'm no expert, but looked into how the python bits work a while back (was trying to assess what it would take to add F# support). It seems python hosts a jvm inside of it, and talks to scala spark in that jvm. The python server bit translates the python calls to those in the jvm. The python spark context is like an adapter to the jvm spark context. If you're seeing performance discrepancies, this might be the reason why. If the code can be organised to require fewer interactions with the adapter, that may improve things. Take this with a pinch of salt...I might be way off on this :) Cheers, Ashic. From: mps@gmail.com Subject: Python vs Scala performance Date: Wed, 22 Oct 2014 12:00:41 +0200 To: user@spark.apache.org Hi there, we have a small Spark cluster running and are processing around 40 GB of Gzip-compressed JSON data per day. I have written a couple of word count-like Scala jobs that essentially pull in all the data, do some joins, group bys and aggregations. A job takes around 40 minutes to complete. Now one of the data scientists on the team wants to do write some jobs using Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the API-side, everything looks more or less identical. However his jobs take between 5-8 hours to complete! We can also see that the execution plan is quite different, I’m seeing writes to the output much later than in Scala. Is Python I/O really that slow? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql query optimization , and decision tree building
Thank you very much , two more small questions : 1) val output = sqlContext.sql(SELECT * From people) my output has 128 columns and single row . how to find the which column has the maximum value in a single row using scala ? 2) as each row has 128 columns how to print each row into a text while with space delimitation or as json using scala? please reply Thanks, Sanath On Wed, Oct 22, 2014 at 8:24 AM, Cheng, Hao hao.ch...@intel.com wrote: The “output” variable is actually a SchemaRDD, it provides lots of DSL API, see http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD 1) How to save result values of a query into a list ? [CH:] val list: Array[Row] = output.collect, however get 1M records into an array seems not a good idea. 2) How to calculate variance of a column .Is there any efficient way? [CH:] Not sure what’s that mean, but you can try output.select(‘colname).groupby ? 3) i will be running multiple queries on same data .Does spark has any way to optimize it ? [CH:] val cachedRdd = output.cache(), and do whatever you need to do based on cachedRDD 4) how to save the output as key value pairs in a text file ? [CH:] cachedRdd.generate(xx,xx,xx).saveAsTextFile(xx) 5) is there any way i can build decision kd tree using machine libraries of spark ? [CH:] Sorry, I am not sure about how kd tree used in mllib. but keep in mind SchemaRDD is just a normal RDD. Cheng Hao *From:* sanath kumar [mailto:sanath1...@gmail.com] *Sent:* Wednesday, October 22, 2014 12:58 PM *To:* user@spark.apache.org *Subject:* spark sql query optimization , and decision tree building Hi all , I have a large data in text files (1,000,000 lines) .Each line has 128 columns . Here each line is a feature and each column is a dimension. I have converted the txt files in json format and able to run sql queries on json files using spark. Now i am trying to build a k dimenstion decision tree (kd tree) with this large data . My steps : 1) calculate variance of each column pick the column with maximum variance and make it as key of first node , and mean of the column as the value of the node. 2) based on the first node value split the data into 2 parts an repeat the process until you reach a point. My sample code : import sqlContext._ val people = sqlContext.jsonFile(siftoutput/) people.printSchema() people.registerTempTable(people) val output = sqlContext.sql(SELECT * From people) My Questions : 1) How to save result values of a query into a list ? 2) How to calculate variance of a column .Is there any efficient way? 3) i will be running multiple queries on same data .Does spark has any way to optimize it ? 4) how to save the output as key value pairs in a text file ? 5) is there any way i can build decision kd tree using machine libraries of spark ? please help Thanks, Sanath
Re: Spark Streaming Applications
Hi Saiph, Patrick McFadin and Helena Edelson from DataStax taught a tutorial at NYC Strata last week where they created a prototype Spark Streaming + Kafka application for time series data. You can see the code here: https://github.com/killrweather/killrweather On Tue, Oct 21, 2014 at 4:33 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I have been trying to find a fairly complex application that makes use of the Spark Streaming framework. I checked public github repos but the examples I found were too simple, only comprising simple operations like counters and sums. On the Spark summit website, I could find very interesting projects, however no source code was available. Where can I find non-trivial spark streaming application code? Is it that difficult? Thanks.
Re: Python vs Scala performance
On Wed, Oct 22, 2014 at 11:34 AM, Eustache DIEMERT eusta...@diemert.fr wrote: Wild guess maybe, but do you decode the json records in Python ? it could be much slower as the default lib is quite slow. Oh yeah, this is a good place to look. Also, just upgrading to Python 2.7 may be enough performance improvement because they merged in the fast JSON deserializing from simplejson into the standard library. So you may not need to use an external library like ujson, though that may help too. Nick
Sharing spark context across multiple spark sql cli initializations
We want to run multiple instances of spark sql cli on our yarn cluster. Each instance of the cli is to be used by a different user. This looks non-optimal if each user brings up a different cli given how spark works on yarn by running executor processes (and hence consuming resources) on worker nodes for the lifetime of the application. So, the right way seems like to use the same spark context shared across multiple initializations and running just one spark sql application. Is the understanding correct ? Is there a way to do it currently ? Seem like it needs some kind of thrift interface hooked into the cli driver.
Re: Usage of spark-ec2: how to deploy a revised version of spark 1.1.0?
Thanks Daniil! if I use --spark-git-repo, is there a way to specify the mvn command line parameters? like following mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package mvn -Pyarn -Phadoop-2.3 -Phbase-hadoop2 -Dhadoop.version=2.3.0 -DskipTests clean package -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Usage-of-spark-ec2-how-to-deploy-a-revised-version-of-spark-1-1-0-tp16943p17040.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Multitenancy in Spark - within/across spark context
Hi Spark devs/users, One of the things we are investigating here at Netflix is if Spark would suit us for our ETL needs, and one of requirements is multi tenancy. I did read the official doc http://spark.apache.org/docs/latest/job-scheduling.html and the book, but I'm still not clear on certain things. Here are my questions : 1. *Sharing spark context* : How exactly multiple users can share the cluster using same spark context ? UserA wants to run AppA, UserB wants to run AppB. How do they talk to same context ? How exactly are each of their jobs scheduled and run in same context? Is preemption supported in this scenario ? How are user names passed on to the spark context ? 2. *Different spark context in YARN*: assuming I have a YARN cluster with queues and preemption configured. Are there problems if executors/containers of a spark app are preempted to allow a high priority spark app to execute ? Would the preempted app get stuck or would it continue to make progress? How are user names passed on from spark to yarn(say I'm using nested user queues feature in fair scheduler) ? 3. Sharing RDDs in 1 and 2 above ? 4. Anything else about user/job isolation ? I know I'm asking a lot of questions. Thanks in advance :) ! -- Thanks, Ashwin Netflix
Shuffle issues in the current master
Hi all, With SPARK-3948, the exception in Snappy PARSING_ERROR is gone, but I've another exception now. I've no clue about what's going on; does anyone run into similar issue? Thanks. This is the configuration I use. spark.rdd.compress true spark.shuffle.consolidateFiles true spark.shuffle.manager SORT spark.akka.frameSize 128 spark.akka.timeout 600 spark.core.connection.ack.wait.timeout 600 spark.core.connection.auth.wait.timeout 300 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325) java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) java.io.ObjectInputStream.init(ObjectInputStream.java:299) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaDeserializationStream.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95) org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:351) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multitenancy in Spark - within/across spark context
Hi Ashwin, Let me try to answer to the best of my knowledge. On Wed, Oct 22, 2014 at 11:47 AM, Ashwin Shankar ashwinshanka...@gmail.com wrote: Here are my questions : 1. Sharing spark context : How exactly multiple users can share the cluster using same spark context ? That's not something you might want to do usually. In general, a SparkContext maps to a user application, so each user would submit their own job which would create its own SparkContext. If you want to go outside of Spark, there are project which allow you to manage SparkContext instances outside of applications and potentially share them, such as https://github.com/spark-jobserver/spark-jobserver. But be sure you actually need it - since you haven't really explained the use case, it's not very clear. 2. Different spark context in YARN: assuming I have a YARN cluster with queues and preemption configured. Are there problems if executors/containers of a spark app are preempted to allow a high priority spark app to execute ? As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. Yarn mode does have a configurable upper limit on the number of executor failures, so if your jobs keeps getting preempted it will eventually fail (unless you tweak the settings). I don't recall whether Yarn has an API to cleanly allow clients to stop executors when preempted, but even if it does, I don't think that's supported in Spark at the moment. How are user names passed on from spark to yarn(say I'm using nested user queues feature in fair scheduler) ? Spark will try to run the job as the requesting user; if you're not using Kerberos, that means the process themselves will be run as whatever user runs the Yarn daemons, but the Spark app will be run inside a UserGroupInformation.doAs() call as the requesting user. So technically nested queues should work as expected. 3. Sharing RDDs in 1 and 2 above ? I'll assume you don't mean actually sharing RDDs in the same context, but between different SparkContext instances. You might (big might here) be able to checkpoint an RDD from one context and load it from another context; that's actually like some HA-like features for Spark drivers are being addressed. The job server I mentioned before, which allows different apps to share the same Spark context, has a feature to share RDDs by name, also, without having to resort to checkpointing. Hope this helps! -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
streaming join sliding windows
Hi, How can I join neighbor sliding windows in spark streaming? Thanks, Josh
Re: Shuffle issues in the current master
It seems that this issue should be addressed by https://github.com/apache/spark/pull/2890 ? Am I right? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 11:54 AM, DB Tsai dbt...@dbtsai.com wrote: Hi all, With SPARK-3948, the exception in Snappy PARSING_ERROR is gone, but I've another exception now. I've no clue about what's going on; does anyone run into similar issue? Thanks. This is the configuration I use. spark.rdd.compress true spark.shuffle.consolidateFiles true spark.shuffle.manager SORT spark.akka.frameSize 128 spark.akka.timeout 600 spark.core.connection.ack.wait.timeout 600 spark.core.connection.auth.wait.timeout 300 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325) java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) java.io.ObjectInputStream.init(ObjectInputStream.java:299) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaDeserializationStream.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95) org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:351) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle issues in the current master
Or can it be solved by setting both of the following setting into true for now? spark.shuffle.spill.compress true spark.shuffle.compress ture Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 1:34 PM, DB Tsai dbt...@dbtsai.com wrote: It seems that this issue should be addressed by https://github.com/apache/spark/pull/2890 ? Am I right? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 11:54 AM, DB Tsai dbt...@dbtsai.com wrote: Hi all, With SPARK-3948, the exception in Snappy PARSING_ERROR is gone, but I've another exception now. I've no clue about what's going on; does anyone run into similar issue? Thanks. This is the configuration I use. spark.rdd.compress true spark.shuffle.consolidateFiles true spark.shuffle.manager SORT spark.akka.frameSize 128 spark.akka.timeout 600 spark.core.connection.ack.wait.timeout 600 spark.core.connection.auth.wait.timeout 300 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325) java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) java.io.ObjectInputStream.init(ObjectInputStream.java:299) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaDeserializationStream.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95) org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:351) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Setting only master heap
We've been getting some OOMs from the spark master since upgrading to Spark 1.1.0. I've found SPARK_DAEMON_MEMORY, but that also seems to increase the worker heap, which as far as I know is fine. Is there any setting which *only* increases the master heap size? Keith
Re: Shuffle issues in the current master
PS, sorry for spamming the mailing list. Based my knowledge, both spark.shuffle.spill.compress and spark.shuffle.compress are default to true, so in theory, we should not run into this issue if we don't change any setting. Is there any other big we run into? Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 1:37 PM, DB Tsai dbt...@dbtsai.com wrote: Or can it be solved by setting both of the following setting into true for now? spark.shuffle.spill.compress true spark.shuffle.compress ture Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 1:34 PM, DB Tsai dbt...@dbtsai.com wrote: It seems that this issue should be addressed by https://github.com/apache/spark/pull/2890 ? Am I right? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 11:54 AM, DB Tsai dbt...@dbtsai.com wrote: Hi all, With SPARK-3948, the exception in Snappy PARSING_ERROR is gone, but I've another exception now. I've no clue about what's going on; does anyone run into similar issue? Thanks. This is the configuration I use. spark.rdd.compress true spark.shuffle.consolidateFiles true spark.shuffle.manager SORT spark.akka.frameSize 128 spark.akka.timeout 600 spark.core.connection.ack.wait.timeout 600 spark.core.connection.auth.wait.timeout 300 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325) java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) java.io.ObjectInputStream.init(ObjectInputStream.java:299) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaDeserializationStream.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95) org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:351) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Rdd of Rdds
Another approach could be to create artificial keys for each RDD and convert to PairRDDs. So your first RDD becomes JavaPairRDDInt,String rdd1 with values 1,1 ; 1,2 and so on Second RDD becomes rdd2 is 2, a; 2, b;2,c You can union the two RDDs, groupByKey, countByKey etc and maybe achieve what you are trying to do. Sorry this is just a hypothesis, as I am not entirely sure about what you are trying to achieve. Ideally, I would think hard whether multiple RDDs are indeed needed, just as Sean pointed out. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Wed, Oct 22, 2014 at 8:35 PM, Sean Owen so...@cloudera.com wrote: No, there's no such thing as an RDD of RDDs in Spark. Here though, why not just operate on an RDD of Lists? or a List of RDDs? Usually one of these two is the right approach whenever you feel inclined to operate on an RDD of RDDs. On Wed, Oct 22, 2014 at 3:58 PM, Tomer Benyamini tomer@gmail.com wrote: Hello, I would like to parallelize my work on multiple RDDs I have. I wanted to know if spark can support a foreach on an RDD of RDDs. Here's a java example: public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName(testapp); sparkConf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(sparkConf); ListString list = Arrays.asList(new String[] {1, 2, 3}); JavaRDDString rdd = sc.parallelize(list); ListString list1 = Arrays.asList(new String[] {a, b, c}); JavaRDDString rdd1 = sc.parallelize(list1); ListJavaRDDString rddList = new ArrayListJavaRDDString(); rddList.add(rdd); rddList.add(rdd1); JavaRDDJavaRDDString rddOfRdds = sc.parallelize(rddList); System.out.println(rddOfRdds.count()); rddOfRdds.foreach(new VoidFunctionJavaRDDString() { @Override public void call(JavaRDDString t) throws Exception { System.out.println(t.count()); } }); } From this code I'm getting a NullPointerException on the internal count method: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 1 times, most recent failure: Exception failure in TID 1 on host localhost: java.lang.NullPointerException org.apache.spark.rdd.RDD.count(RDD.scala:861) org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:365) org.apache.spark.api.java.JavaRDD.count(JavaRDD.scala:29) Help will be appreciated. Thanks, Tomer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId
Hi, I just tried sample PI calculation on Spark Cluster, after returning the Pi result, it shows ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(m37,35662) not found ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://m33:7077 --executor-memory 512m --total-executor-cores 40 examples/target/spark-examples_2.10-1.1.0.jar 100 14/10/23 05:09:03 INFO TaskSetManager: Finished task 87.0 in stage 0.0 (TID 87) in 346 ms on m134.emblocsoft.net (99/100) 14/10/23 05:09:03 INFO TaskSetManager: Finished task 98.0 in stage 0.0 (TID 98) in 262 ms on m134.emblocsoft.net (100/100) 14/10/23 05:09:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/10/23 05:09:03 INFO DAGScheduler: Stage 0 (reduce at SparkPi.scala:35) finished in 2.597 s 14/10/23 05:09:03 INFO SparkContext: Job finished: reduce at SparkPi.scala:35, took 2.725328861 s Pi is roughly 3.1414948 14/10/23 05:09:03 INFO SparkUI: Stopped Spark web UI at http://m33:4040 14/10/23 05:09:03 INFO DAGScheduler: Stopping DAGScheduler 14/10/23 05:09:03 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/10/23 05:09:03 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/10/23 05:09:04 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@37852165 14/10/23 05:09:04 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(m37,35662) 14/10/23 05:09:04 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(m37,35662) 14/10/23 05:09:04 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(m37,35662) not found 14/10/23 05:09:04 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@37852165 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/23 05:09:04 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(m36,34230) 14/10/23 05:09:04 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(m35,50371) 14/10/23 05:09:04 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(m36,34230) 14/10/23 05:09:04 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(m34,41562) 14/10/23 05:09:04 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(m36,34230) not found 14/10/23 05:09:04 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(m35,50371) 14/10/23 05:09:04 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(m35,50371) not found 14/10/23 05:09:04 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(m33,39517) 14/10/23 05:09:04 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(m33,39517) 14/10/23 05:09:04 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(m34,41562) 14/10/23 05:09:04 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(m33,39517) not found 14/10/23 05:09:04 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(m34,41562) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) at org.apache.spark.network.SendingConnection.read(Connection.scala:390) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/10/23 05:09:04 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(m34,41562) 14/10/23 05:09:04 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(m34,41562) 14/10/23 05:09:04 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@2e0b5c4a 14/10/23 05:09:04 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@2e0b5c4a java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/10/23 05:09:04 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@653f8844 14/10/23 05:09:04 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@653f8844 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at
Re: Multitenancy in Spark - within/across spark context
Thanks Marcelo, that was helpful ! I had some follow up questions : That's not something you might want to do usually. In general, a SparkContext maps to a user application My question was basically this. In this http://spark.apache.org/docs/latest/job-scheduling.html page in the official doc, under Scheduling within an application section, it talks about multiuser and fair sharing within an app. How does multiuser within an application work(how users connect to an app,run their stuff) ? When would I want to use this ? As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. I basically wanted to find out if there were any gotchas related to preemption on Spark. Things like say half of an application's executors got preempted say while doing reduceByKey, will the application progress with the remaining resources/fair share ? I'm new to spark, sry if I'm asking something very obvious :). Thanks, Ashwin On Wed, Oct 22, 2014 at 12:07 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Ashwin, Let me try to answer to the best of my knowledge. On Wed, Oct 22, 2014 at 11:47 AM, Ashwin Shankar ashwinshanka...@gmail.com wrote: Here are my questions : 1. Sharing spark context : How exactly multiple users can share the cluster using same spark context ? That's not something you might want to do usually. In general, a SparkContext maps to a user application, so each user would submit their own job which would create its own SparkContext. If you want to go outside of Spark, there are project which allow you to manage SparkContext instances outside of applications and potentially share them, such as https://github.com/spark-jobserver/spark-jobserver. But be sure you actually need it - since you haven't really explained the use case, it's not very clear. 2. Different spark context in YARN: assuming I have a YARN cluster with queues and preemption configured. Are there problems if executors/containers of a spark app are preempted to allow a high priority spark app to execute ? As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. Yarn mode does have a configurable upper limit on the number of executor failures, so if your jobs keeps getting preempted it will eventually fail (unless you tweak the settings). I don't recall whether Yarn has an API to cleanly allow clients to stop executors when preempted, but even if it does, I don't think that's supported in Spark at the moment. How are user names passed on from spark to yarn(say I'm using nested user queues feature in fair scheduler) ? Spark will try to run the job as the requesting user; if you're not using Kerberos, that means the process themselves will be run as whatever user runs the Yarn daemons, but the Spark app will be run inside a UserGroupInformation.doAs() call as the requesting user. So technically nested queues should work as expected. 3. Sharing RDDs in 1 and 2 above ? I'll assume you don't mean actually sharing RDDs in the same context, but between different SparkContext instances. You might (big might here) be able to checkpoint an RDD from one context and load it from another context; that's actually like some HA-like features for Spark drivers are being addressed. The job server I mentioned before, which allows different apps to share the same Spark context, has a feature to share RDDs by name, also, without having to resort to checkpointing. Hope this helps! -- Marcelo -- Thanks, Ashwin
Does SQLSpark support Hive built in functions?
Hi, I just wonder if SparkSQL supports Hive built-in functions (e.g. from_unixtime) or any of the functions pointed out here : ( https://cwiki.apache.org/confluence/display/Hive/Tutorial) best, /Shahab
Re: Python vs Scala performance
Yeah we’re using Python 2.7.3. On 22.10.2014, at 20:06, Nicholas Chammas nicholas.cham...@gmail.com wrote: On Wed, Oct 22, 2014 at 11:34 AM, Eustache DIEMERT eusta...@diemert.fr wrote: Wild guess maybe, but do you decode the json records in Python ? it could be much slower as the default lib is quite slow. Oh yeah, this is a good place to look. Also, just upgrading to Python 2.7 may be enough performance improvement because they merged in the fast JSON deserializing from simplejson into the standard library. So you may not need to use an external library like ujson, though that may help too. Nick
Re: Python vs Scala performance
Can’t install that on our cluster, but I can try locally. Is there a pre-built binary available? On 22.10.2014, at 19:01, Davies Liu dav...@databricks.com wrote: In the master, you can easily profile you job, find the bottlenecks, see https://github.com/apache/spark/pull/2556 Could you try it and show the stats? Davies - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multitenancy in Spark - within/across spark context
On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: That's not something you might want to do usually. In general, a SparkContext maps to a user application My question was basically this. In this page in the official doc, under Scheduling within an application section, it talks about multiuser and fair sharing within an app. How does multiuser within an application work(how users connect to an app,run their stuff) ? When would I want to use this ? I see. The way I read that page is that Spark supports all those scheduling options; but Spark doesn't give you the means to actually be able to submit jobs from different users to a running SparkContext hosted on a different process. For that, you'll need something like the job server that I referenced before, or write your own framework for supporting that. Personally, I'd use the information on that page when dealing with concurrent jobs in the same SparkContext, but still restricted to the same user. I'd avoid trying to create any application where a single SparkContext is trying to be shared by multiple users in any way. As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. I basically wanted to find out if there were any gotchas related to preemption on Spark. Things like say half of an application's executors got preempted say while doing reduceByKey, will the application progress with the remaining resources/fair share ? Jobs should still make progress as long as at least one executor is available. The gotcha would be the one I mentioned, where Spark will fail your job after x executors failed, which might be a common occurrence when preemption is enabled. That being said, it's a configurable option, so you can set x to a very large value and your job should keep on chugging along. The options you'd want to take a look at are: spark.task.maxFailures and spark.yarn.max.executor.failures -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Rdd of Rdds
On Wednesday, October 22, 2014 9:06 AM, Sean Owen so...@cloudera.com wrote: No, there's no such thing as an RDD of RDDs in Spark. Here though, why not just operate on an RDD of Lists? or a List of RDDs? Usually one of these two is the right approach whenever you feel inclined to operate on an RDD of RDDs. Depending on one's needs, one could also consider the matrix (RDD[Vector]) operations provided by MLLib, such as https://spark.apache.org/docs/latest/mllib-statistics.html - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does SQLSpark support Hive built in functions?
Yes, when using a HiveContext. On Wed, Oct 22, 2014 at 2:20 PM, shahab shahab.mok...@gmail.com wrote: Hi, I just wonder if SparkSQL supports Hive built-in functions (e.g. from_unixtime) or any of the functions pointed out here : ( https://cwiki.apache.org/confluence/display/Hive/Tutorial) best, /Shahab
Re: Python vs Scala performance
Sorry, there is not, you can try clone from github and build it from scratch, see [1] [1] https://github.com/apache/spark Davies On Wed, Oct 22, 2014 at 2:31 PM, Marius Soutier mps@gmail.com wrote: Can’t install that on our cluster, but I can try locally. Is there a pre-built binary available? On 22.10.2014, at 19:01, Davies Liu dav...@databricks.com wrote: In the master, you can easily profile you job, find the bottlenecks, see https://github.com/apache/spark/pull/2556 Could you try it and show the stats? Davies - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sharing spark context across multiple spark sql cli initializations
The JDBC server is what you are looking for: http://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbc-server On Wed, Oct 22, 2014 at 11:10 AM, Sadhan Sood sadhan.s...@gmail.com wrote: We want to run multiple instances of spark sql cli on our yarn cluster. Each instance of the cli is to be used by a different user. This looks non-optimal if each user brings up a different cli given how spark works on yarn by running executor processes (and hence consuming resources) on worker nodes for the lifetime of the application. So, the right way seems like to use the same spark context shared across multiple initializations and running just one spark sql application. Is the understanding correct ? Is there a way to do it currently ? Seem like it needs some kind of thrift interface hooked into the cli driver.
Re: Setting only master heap
Hi Keith, Would be helpful if you could post the error message. Are you running Spark in Standalone mode or with YARN? In general, the Spark Master is only used for scheduling and it should be fine with the default setting of 512 MB RAM. Is it actually the Spark Driver's memory that you intended to change? *++ If in Standalone mode ++* You're right that SPARK_DAEMON_MEMORY set the memory to allocate to the Spark Master, Worker and even HistoryServer daemons together. SPARK_WORKER_MEMORY is slightly confusing. In Standalone mode, it is the amount of memory that a worker advertises as available for drivers to launch executors. The sum of the memory used by executors spawned from a worker cannot exceed SPARK_WORKER_MEMORY. Unfortunately, I'm not aware of a way to set the memory for Master and Worker individually, other than launching them manually. You can also try setting the config differently on each machine's spark-env.sh file. *++ If in YARN mode ++* In YARN, there is no setting for SPARK_DAEMON_MEMORY. Therefore this is only in the Standalone documentation. Remember that in YARN mode there is no Spark Worker, instead the YARN NodeManagers launches the Executors. And in YARN, there is no need to run a Spark Master JVM (since the YARN ResourceManager takes care of the scheduling). So, with YARN use SPARK_EXECUTOR_MEMORY to set the Executor's memory. And use SPARK_DRIVER_MEMORY to set the Driver's memory. Just an FYI - for compatibility's sake, even in YARN mode there is a setting for SPARK_WORKER_MEMORY, but this has been deprecated. If you do set it, it just does the same thing as setting SPARK_EXECUTOR_MEMORY would have done. - Sameer On Wed, Oct 22, 2014 at 1:46 PM, Keith Simmons ke...@pulse.io wrote: We've been getting some OOMs from the spark master since upgrading to Spark 1.1.0. I've found SPARK_DAEMON_MEMORY, but that also seems to increase the worker heap, which as far as I know is fine. Is there any setting which *only* increases the master heap size? Keith
Re: SchemaRDD Convert
The implicit conversion function mentioned by Hao is createSchemaRDD in SQLContext/HiveContext. You can import it by doing val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Or new org.apache.spark.sql.hive.HiveContext(sc) for HiveContext import sqlContext.createSchemaRDD On Wed, Oct 22, 2014 at 8:03 AM, Cheng, Hao hao.ch...@intel.com wrote: You needn’t do anything, the implicit conversion should do this for you. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L103 https://github.com/apache/spark/blob/2ac40da3f9fa6d45a59bb45b41606f1931ac5e81/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L35 Just be sure you import the right implicit conversion function. *From:* Dai, Kevin [mailto:yun...@ebay.com] *Sent:* Wednesday, October 22, 2014 4:17 PM *To:* user@spark.apache.org *Subject:* SchemaRDD Convert Hi, ALL I have a RDD of case class T and T contains several primitive types and a Map. How can I convert this to a SchemaRDD? Best Regards, Kevin.
Solving linear equations
Hi, I'm wondering how to use Mllib for solving equation systems following this pattern 2*x1 + x2 + 3*x3 + + xn = 0 x1 + 0*x2 + 3*x3 + + xn = 0 .. .. 0*x1 + x2 + 0*x3 + + xn = 0 I definitely still have some reading to do to really understand the direct solving techniques, but at the current state of knowledge SVD could help me with this right? Can you point me to an example or a tutorial? best regards
Re: Solving linear equations
Hi Martin, This problem is Ax = B where A is your matrix [2 1 3 ... 1; 1 0 3 ...;] and x is what you want to find..B is 0 in this case...For mllib normally this is labelbasically create a labeledPoint where label is 0 always... Use mllib's linear regression and solve the following problem: min ||Ax - B||_{2}^{2} + lambda||x||_{2}^{2} Put a small regularization to condition the problem (~1e-4)...and play with some options for learning rate in linear regression... The parameter vector that you get out of mllib linear regression is the answer to your linear equation solver... Thanks. Deb On Wed, Oct 22, 2014 at 4:15 PM, Martin Enzinger martin.enzin...@gmail.com wrote: Hi, I'm wondering how to use Mllib for solving equation systems following this pattern 2*x1 + x2 + 3*x3 + + xn = 0 x1 + 0*x2 + 3*x3 + + xn = 0 .. .. 0*x1 + x2 + 0*x3 + + xn = 0 I definitely still have some reading to do to really understand the direct solving techniques, but at the current state of knowledge SVD could help me with this right? Can you point me to an example or a tutorial? best regards
Re: Usage of spark-ec2: how to deploy a revised version of spark 1.1.0?
I modified the pom files in my private repo to use those parameters as default to solve the problem. But after the deployment, I found the installed version is not the customized version, but an official one. Anyone please give a hint on how the spark-ec2 work with spark from private repos.. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Usage-of-spark-ec2-how-to-deploy-a-revised-version-of-spark-1-1-0-tp16943p17067.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle issues in the current master
You may be running into this issue: https://issues.apache.org/jira/browse/SPARK-4019 You could check by having 2000 or fewer reduce partitions. On Wed, Oct 22, 2014 at 1:48 PM, DB Tsai dbt...@dbtsai.com wrote: PS, sorry for spamming the mailing list. Based my knowledge, both spark.shuffle.spill.compress and spark.shuffle.compress are default to true, so in theory, we should not run into this issue if we don't change any setting. Is there any other big we run into? Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 1:37 PM, DB Tsai dbt...@dbtsai.com wrote: Or can it be solved by setting both of the following setting into true for now? spark.shuffle.spill.compress true spark.shuffle.compress ture Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 1:34 PM, DB Tsai dbt...@dbtsai.com wrote: It seems that this issue should be addressed by https://github.com/apache/spark/pull/2890 ? Am I right? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 11:54 AM, DB Tsai dbt...@dbtsai.com wrote: Hi all, With SPARK-3948, the exception in Snappy PARSING_ERROR is gone, but I've another exception now. I've no clue about what's going on; does anyone run into similar issue? Thanks. This is the configuration I use. spark.rdd.compress true spark.shuffle.consolidateFiles true spark.shuffle.manager SORT spark.akka.frameSize 128 spark.akka.timeout 600 spark.core.connection.ack.wait.timeout 600 spark.core.connection.auth.wait.timeout 300 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325) java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) java.io.ObjectInputStream.init(ObjectInputStream.java:299) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaDeserializationStream.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95) org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:351) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: Getting spark to use more than 4 cores on Amazon EC2
On a related note, how are you submitting your job? I have a simple streaming proof of concept and noticed that everything runs on my master. I wonder if I do not have enough load for spark to push tasks to the slaves. Thanks Andy From: Daniel Mahler dmah...@gmail.com Date: Monday, October 20, 2014 at 5:22 PM To: Nicholas Chammas nicholas.cham...@gmail.com Cc: user user@spark.apache.org Subject: Re: Getting spark to use more than 4 cores on Amazon EC2 I am using globs though raw = sc.textFile(/path/to/dir/*/*) and I have tons of files so 1 file per partition should not be a problem. On Mon, Oct 20, 2014 at 7:14 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: The biggest danger with gzipped files is this: raw = sc.textFile(/path/to/file.gz, 8) raw.getNumPartitions() 1 You think you’re telling Spark to parallelize the reads on the input, but Spark cannot parallelize reads against gzipped files. So 1 gzipped file gets assigned to 1 partition. It might be a nice user hint if Spark warned when parallelism is disabled by the input format. Nick On Mon, Oct 20, 2014 at 6:53 PM, Daniel Mahler dmah...@gmail.com wrote: Hi Nicholas, Gzipping is a an impressive guess! Yes, they are. My data sets are too large to make repartitioning viable, but I could try it on a subset. I generally have many more partitions than cores. This was happenning before I started setting those configs. thanks Daniel On Mon, Oct 20, 2014 at 5:37 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Are you dealing with gzipped files by any chance? Does explicitly repartitioning your RDD to match the number of cores in your cluster help at all? How about if you don't specify the configs you listed and just go with defaults all around? On Mon, Oct 20, 2014 at 5:22 PM, Daniel Mahler dmah...@gmail.com wrote: I launch the cluster using vanilla spark-ec2 scripts. I just specify the number of slaves and instance type On Mon, Oct 20, 2014 at 4:07 PM, Daniel Mahler dmah...@gmail.com wrote: I usually run interactively from the spark-shell. My data definitely has more than enough partitions to keep all the workers busy. When I first launch the cluster I first do: + cat EOF ~/spark/conf/spark-defaults.conf spark.serializerorg.apache.spark.serializer.KryoSerializer spark.rdd.compress true spark.shuffle.consolidateFiles true spark.akka.frameSize 20 EOF copy-dir /root/spark/conf spark/sbin/stop-all.sh sleep 5 spark/sbin/start-all.sh + before starting the spark-shell or running any jobs. On Mon, Oct 20, 2014 at 2:57 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Perhaps your RDD is not partitioned enough to utilize all the cores in your system. Could you post a simple code snippet and explain what kind of parallelism you are seeing for it? And can you report on how many partitions your RDDs have? On Mon, Oct 20, 2014 at 3:53 PM, Daniel Mahler dmah...@gmail.com wrote: I am launching EC2 clusters using the spark-ec2 scripts. My understanding is that this configures spark to use the available resources. I can see that spark will use the available memory on larger istance types. However I have never seen spark running at more than 400% (using 100% on 4 cores) on machines with many more cores. Am I misunderstanding the docs? Is it just that high end ec2 instances get I/O starved when running spark? It would be strange if that consistently produced a 400% hard limit though. thanks Daniel
Re: ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId
Hi, I have managed to resolve it because a wrong setting. Please ignore this . Regards Arthur On 23 Oct, 2014, at 5:14 am, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: 14/10/23 05:09:04 WARN ConnectionManager: All connections not cleaned up
Spark: Order by Failed, java.lang.NullPointerException
Hi, I got java.lang.NullPointerException. Please help! sqlContext.sql(select l_orderkey, l_linenumber, l_partkey, l_quantity, l_shipdate, L_RETURNFLAG, L_LINESTATUS from lineitem limit 10).collect().foreach(println); 2014-10-23 08:20:12,024 INFO [sparkDriver-akka.actor.default-dispatcher-31] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 41 (runJob at basicOperators.scala:136) finished in 0.086 s 2014-10-23 08:20:12,024 INFO [Result resolver thread-1] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 41.0, whose tasks have all completed, from pool 2014-10-23 08:20:12,024 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Job finished: runJob at basicOperators.scala:136, took 0.090129332 s [9001,6,-4584121,17,1997-01-04,N,O] [9002,1,-2818574,23,1996-02-16,N,O] [9002,2,-2449102,21,1993-12-12,A,F] [9002,3,-5810699,26,1994-04-06,A,F] [9002,4,-489283,18,1994-11-11,R,F] [9002,5,2169683,15,1997-09-14,N,O] [9002,6,2405081,4,1992-08-03,R,F] [9002,7,3835341,40,1998-04-28,N,O] [9003,1,1900071,4,1994-05-05,R,F] [9004,1,-2614665,41,1993-06-13,A,F] If order by L_LINESTATUS” is added then error: sqlContext.sql(select l_orderkey, l_linenumber, l_partkey, l_quantity, l_shipdate, L_RETURNFLAG, L_LINESTATUS from lineitem order by L_LINESTATUS limit 10).collect().foreach(println); 2014-10-23 08:22:08,524 INFO [main] parse.ParseDriver (ParseDriver.java:parse(179)) - Parsing command: select l_orderkey, l_linenumber, l_partkey, l_quantity, l_shipdate, L_RETURNFLAG, L_LINESTATUS from lineitem order by L_LINESTATUS limit 10 2014-10-23 08:22:08,525 INFO [main] parse.ParseDriver (ParseDriver.java:parse(197)) - Parse Completed 2014-10-23 08:22:08,526 INFO [main] metastore.HiveMetaStore (HiveMetaStore.java:logInfo(454)) - 0: get_table : db=boc_12 tbl=lineitem 2014-10-23 08:22:08,526 INFO [main] HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(239)) - ugi=hd ip=unknown-ip-addr cmd=get_table : db=boc_12 tbl=lineitem java.lang.NullPointerException at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:1262) at org.apache.spark.SparkContext.defaultMinPartitions(SparkContext.scala:1269) at org.apache.spark.sql.hive.HadoopTableReader.init(TableReader.scala:63) at org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:68) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) at org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:364) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:184) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$TakeOrdered$.apply(SparkStrategies.scala:191) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
version mismatch issue with spark breeze vector
I'm trying to submit a simple test code through spark-submit. first portion of the code works fine, but some calls to breeze vector library fails: 14/10/22 17:36:02 INFO CacheManager: Partition rdd_1_0 not found, computing it 14/10/22 17:36:02 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NoSuchMethodError: breeze.linalg.SparseVector$mcD$sp.init([I[DILbreeze/storage/DefaultArrayValue;)V at spark.kmeans.SparseVector.toBreeze(Vectors.scala:182) at spark.kmeans.MyTest$$anonfun$1.apply(MyTest.scala:56) at spark.kmeans.MyTest$$anonfun$1.apply(MyTest.scala:56) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) I ran this code with ~/tools/spark-1.1.0-bin-hadoop2.4/bin/spark-submit --class spark.kmeans.MyTest --master local target/reco-v1-jar-with-dependencies.jar my system scala version is 2.10.4 the relevant sections in my pom is: (I did not build with sbt, since it gave me some problems) dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.0.2/version exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-mllib_2.10/artifactId version1.0.2/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version2.10.4/version /dependency Thanks a lot Yang
Re: version mismatch issue with spark breeze vector
Hi Yang, It looks like your build file is a different version than the version of Spark you are running against. I'd try building against the same version of spark as you are running your application against (1.1.0). Also what is your assembly/shading configuration for your build? Cheers, Holden :) On Wed, Oct 22, 2014 at 5:39 PM, Yang tedd...@gmail.com wrote: I'm trying to submit a simple test code through spark-submit. first portion of the code works fine, but some calls to breeze vector library fails: 14/10/22 17:36:02 INFO CacheManager: Partition rdd_1_0 not found, computing it 14/10/22 17:36:02 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NoSuchMethodError: breeze.linalg.SparseVector$mcD$sp.init([I[DILbreeze/storage/DefaultArrayValue;)V at spark.kmeans.SparseVector.toBreeze(Vectors.scala:182) at spark.kmeans.MyTest$$anonfun$1.apply(MyTest.scala:56) at spark.kmeans.MyTest$$anonfun$1.apply(MyTest.scala:56) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) I ran this code with ~/tools/spark-1.1.0-bin-hadoop2.4/bin/spark-submit --class spark.kmeans.MyTest --master local target/reco-v1-jar-with-dependencies.jar my system scala version is 2.10.4 the relevant sections in my pom is: (I did not build with sbt, since it gave me some problems) dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.0.2/version exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-mllib_2.10/artifactId version1.0.2/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version2.10.4/version /dependency Thanks a lot Yang -- Cell : 425-233-8271
Re: Getting spark to use more than 4 cores on Amazon EC2
Another wild guess, if your data is stored in S3, you might be running into an issue where the default jets3t properties limits the number of parallel S3 connections to 4. Consider increasing the max-thread-counts from here: http://www.jets3t.org/toolkit/configuration.html. On Tue, Oct 21, 2014 at 10:39 AM, Andy Davidson a...@santacruzintegration.com wrote: On a related note, how are you submitting your job? I have a simple streaming proof of concept and noticed that everything runs on my master. I wonder if I do not have enough load for spark to push tasks to the slaves. Thanks Andy From: Daniel Mahler dmah...@gmail.com Date: Monday, October 20, 2014 at 5:22 PM To: Nicholas Chammas nicholas.cham...@gmail.com Cc: user user@spark.apache.org Subject: Re: Getting spark to use more than 4 cores on Amazon EC2 I am using globs though raw = sc.textFile(/path/to/dir/*/*) and I have tons of files so 1 file per partition should not be a problem. On Mon, Oct 20, 2014 at 7:14 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: The biggest danger with gzipped files is this: raw = sc.textFile(/path/to/file.gz, 8) raw.getNumPartitions()1 You think you’re telling Spark to parallelize the reads on the input, but Spark cannot parallelize reads against gzipped files. So 1 gzipped file gets assigned to 1 partition. It might be a nice user hint if Spark warned when parallelism is disabled by the input format. Nick On Mon, Oct 20, 2014 at 6:53 PM, Daniel Mahler dmah...@gmail.com wrote: Hi Nicholas, Gzipping is a an impressive guess! Yes, they are. My data sets are too large to make repartitioning viable, but I could try it on a subset. I generally have many more partitions than cores. This was happenning before I started setting those configs. thanks Daniel On Mon, Oct 20, 2014 at 5:37 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Are you dealing with gzipped files by any chance? Does explicitly repartitioning your RDD to match the number of cores in your cluster help at all? How about if you don't specify the configs you listed and just go with defaults all around? On Mon, Oct 20, 2014 at 5:22 PM, Daniel Mahler dmah...@gmail.com wrote: I launch the cluster using vanilla spark-ec2 scripts. I just specify the number of slaves and instance type On Mon, Oct 20, 2014 at 4:07 PM, Daniel Mahler dmah...@gmail.com wrote: I usually run interactively from the spark-shell. My data definitely has more than enough partitions to keep all the workers busy. When I first launch the cluster I first do: + cat EOF ~/spark/conf/spark-defaults.conf spark.serializerorg.apache.spark.serializer.KryoSerializer spark.rdd.compress true spark.shuffle.consolidateFiles true spark.akka.frameSize 20 EOF copy-dir /root/spark/conf spark/sbin/stop-all.sh sleep 5 spark/sbin/start-all.sh + before starting the spark-shell or running any jobs. On Mon, Oct 20, 2014 at 2:57 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Perhaps your RDD is not partitioned enough to utilize all the cores in your system. Could you post a simple code snippet and explain what kind of parallelism you are seeing for it? And can you report on how many partitions your RDDs have? On Mon, Oct 20, 2014 at 3:53 PM, Daniel Mahler dmah...@gmail.com wrote: I am launching EC2 clusters using the spark-ec2 scripts. My understanding is that this configures spark to use the available resources. I can see that spark will use the available memory on larger istance types. However I have never seen spark running at more than 400% (using 100% on 4 cores) on machines with many more cores. Am I misunderstanding the docs? Is it just that high end ec2 instances get I/O starved when running spark? It would be strange if that consistently produced a 400% hard limit though. thanks Daniel
SparkSQL , best way to divide data into partitions?
Hi I have a json file that can be load by sqlcontext.jsonfile into a table. but this table is not partitioned. Then I wish to transform this table into a partitioned table say on field “date” etc. what will be the best approaching to do this? seems in hive this is usually done by load data into a dedicated partition directly. but if I don’t want to select data out by a specific partition then insert it with each partition field value. How should I do it in a quick way? And how to do it in Spark sql? raymond - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Hive Snappy Error
Hi,Please find the attached file.{\rtf1\ansi\ansicpg1252\cocoartf1265\cocoasubrtf210 {\fonttbl\f0\fnil\fcharset0 Menlo-Regular;} {\colortbl;\red255\green255\blue255;} \paperw11900\paperh16840\margl1440\margr1440\vieww26300\viewh12480\viewkind0 \pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural \f0\fs22 \cf0 \CocoaLigature0 lsof -p 16459 (Master)\ COMMAND PIDUSER FD TYPE DEVICE SIZE/OFF NODE NAME\ java16459 tester cwdDIR 253,2 4096 6039786 /hadoop/spark-1.1.0_patched\ java16459 tester rtdDIR 253,0 40962 /\ java16459 tester txtREG 253,0 12150 2780995 /usr/lib/jvm/jdk1.7.0_67/bin/java\ java16459 tester memREG 253,0156928 2228230 /lib64/ld-2.12.so\ java16459 tester memREG 253,0 1926680 2228250 /lib64/libc-2.12.so\ java16459 tester memREG 253,0145896 2228251 /lib64/libpthread-2.12.so\ java16459 tester memREG 253,0 22536 2228254 /lib64/libdl-2.12.so\ java16459 tester memREG 253,0109006 2759278 /usr/lib/jvm/jdk1.7.0_67/lib/amd64/jli/libjli.so\ java16459 tester memREG 253,0599384 2228264 /lib64/libm-2.12.so\ java16459 tester memREG 253,0 47064 2228295 /lib64/librt-2.12.so\ java16459 tester memREG 253,0113952 2228328 /lib64/libresolv-2.12.so\ java16459 tester memREG 253,0 99158576 2388225 /usr/lib/locale/locale-archive\ java16459 tester memREG 253,0 27424 2228249 /lib64/libnss_dns-2.12.so\ java16459 tester memREG 253,2 138832345 6555616 /hadoop/spark-1.1.0_patched/assembly/target/scala-2.10/spark-assembly-1.1.0-hadoop2.4.1.jar\ java16459 tester memREG 253,0580624 2893171 /usr/lib/jvm/jdk1.7.0_67/jre/lib/jsse.jar\ java16459 tester memREG 253,0114742 2893221 /usr/lib/jvm/jdk1.7.0_67/jre/lib/amd64/libnet.so\ java16459 tester memREG 253,0 91178 2893222 /usr/lib/jvm/jdk1.7.0_67/jre/lib/amd64/libnio.so\ java16459 tester memREG 253,2 1769726 6816963 /hadoop/spark-1.1.0_patched/lib_managed/jars/datanucleus-rdbms-3.2.1.jar\ java16459 tester memREG 253,2337012 6816961 /hadoop/spark-1.1.0_patched/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar\ java16459 tester memREG 253,2 1801810 6816962 /hadoop/spark-1.1.0_patched/lib_managed/jars/datanucleus-core-3.2.2.jar\ java16459 tester memREG 253,2 25153 7079998 /hadoop/hive-0.12.0-bin/csv-serde-1.1.2-0.11.0-all.jar\ java16459 tester memREG 253,2 21817 6032989 /hadoop/hbase-0.98.5-hadoop2/lib/gmbal-api-only-3.0.0-b023.jar\ java16459 tester memREG 253,2177131 6032940 /hadoop/hbase-0.98.5-hadoop2/lib/jetty-util-6.1.26.jar\ java16459 tester memREG 253,2 32677 6032915 /hadoop/hbase-0.98.5-hadoop2/lib/hbase-hadoop-compat-0.98.5-hadoop2.jar\ java16459 tester memREG 253,2143602 6032959 /hadoop/hbase-0.98.5-hadoop2/lib/commons-digester-1.8.jar\ java16459 tester memREG 253,2 97738 6032917 /hadoop/hbase-0.98.5-hadoop2/lib/hbase-prefix-tree-0.98.5-hadoop2.jar\ java16459 tester memREG 253,2 17884 6032949 /hadoop/hbase-0.98.5-hadoop2/lib/jackson-jaxrs-1.8.8.jar\ java16459 tester memREG 253,2253086 6032987 /hadoop/hbase-0.98.5-hadoop2/lib/grizzly-http-2.1.2.jar\ java16459 tester memREG 253,2 73778 6032916 /hadoop/hbase-0.98.5-hadoop2/lib/hbase-hadoop2-compat-0.98.5-hadoop2.jar\ java16459 tester memREG 253,2336904 6032993 /hadoop/hbase-0.98.5-hadoop2/lib/grizzly-http-servlet-2.1.2.jar\ java16459 tester memREG 253,2927415 6032914 /hadoop/hbase-0.98.5-hadoop2/lib/hbase-client-0.98.5-hadoop2.jar\ java16459 tester memREG 253,2125740 6033008 /hadoop/hbase-0.98.5-hadoop2/lib/hadoop-yarn-server-applicationhistoryservice-2.4.1.jar\ java16459 tester memREG 253,2 15010 6032936 /hadoop/hbase-0.98.5-hadoop2/lib/xmlenc-0.52.jar\ java16459 tester memREG 253,2 60686 6032926 /hadoop/hbase-0.98.5-hadoop2/lib/commons-logging-1.1.1.jar\ java16459 tester memREG 253,2259600 6032927 /hadoop/hbase-0.98.5-hadoop2/lib/commons-codec-1.7.jar\ java16459 tester memREG 253,2321806 6032957 /hadoop/hbase-0.98.5-hadoop2/lib/jets3t-0.6.1.jar\ java16459 tester memREG 253,2 85353 6032982 /hadoop/hbase-0.98.5-hadoop2/lib/javax.servlet-api-3.0.1.jar\ java16459 tester mem
hive timestamp column always returns null
Hello Experts, I created a table using spark-sql CLI. No Hive is installed. I am using spark 1.1.0. create table date_test(my_date timestamp) row format delimited fields terminated by ' ' lines terminated by '\n' LOCATION '/user/hive/date_test'; The data file has following data: 2014-12-11 00:00:00 2013-11-11T00:00:00 2012-11-11T00:00:00Z when I query using select * from date_test it returns: NULL NULL NULL Could you please help me to resolve this issue? Thanks Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hive-timestamp-column-always-returns-null-tp17079.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Hive Snappy Error
Hi May I know where to configure Spark to load libhadoop.so? Regards Arthur On 23 Oct, 2014, at 11:31 am, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, Please find the attached file. lsof.rtf my spark-default.xml # Default system properties included when running spark-submit. # This is useful for setting default environmental settings. # # Example: # spark.masterspark://master:7077 # spark.eventLog.enabled true # spark.eventLog.dirhdfs://namenode:8021/directory # spark.serializerorg.apache.spark.serializer.KryoSerializer # spark.executor.memory 2048m spark.shuffle.spill.compressfalse spark.io.compression.codecorg.apache.spark.io.SnappyCompressionCodec my spark-env.sh #!/usr/bin/env bash export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar export CLASSPATH=$CLASSPATH:$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64 export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop} export SPARK_WORKER_DIR=/edh/hadoop_data/spark_work/ export SPARK_LOG_DIR=/edh/hadoop_logs/spark export SPARK_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64 export SPARK_CLASSPATH=$SPARK_HOME/lib_managed/jars/mysql-connector-java-5.1.31-bin.jar export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HBASE_HOME/lib/*:$HIVE_HOME/csv-serde-1.1.2-0.11.0-all.jar: export SPARK_WORKER_MEMORY=2g export HADOOP_HEAPSIZE=2000 export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=m35:2181,m33:2181,m37:2181 export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC ll $HADOOP_HOME/lib/native/Linux-amd64-64 -rw-rw-r--. 1 tester tester50523 Aug 27 14:12 hadoop-auth-2.4.1.jar -rw-rw-r--. 1 tester tester 1062640 Aug 27 12:19 libhadoop.a -rw-rw-r--. 1 tester tester 1487564 Aug 27 11:14 libhadooppipes.a lrwxrwxrwx. 1 tester tester 24 Aug 27 07:08 libhadoopsnappy.so - libhadoopsnappy.so.0.0.1 lrwxrwxrwx. 1 tester tester 24 Aug 27 07:08 libhadoopsnappy.so.0 - libhadoopsnappy.so.0.0.1 -rwxr-xr-x. 1 tester tester54961 Aug 27 07:08 libhadoopsnappy.so.0.0.1 -rwxrwxr-x. 1 tester tester 630328 Aug 27 12:19 libhadoop.so -rwxrwxr-x. 1 tester tester 630328 Aug 27 12:19 libhadoop.so.1.0.0 -rw-rw-r--. 1 tester tester 582472 Aug 27 11:14 libhadooputils.a -rw-rw-r--. 1 tester tester 298626 Aug 27 11:14 libhdfs.a -rwxrwxr-x. 1 tester tester 200370 Aug 27 11:14 libhdfs.so -rwxrwxr-x. 1 tester tester 200370 Aug 27 11:14 libhdfs.so.0.0.0 lrwxrwxrwx. 1 tester tester 55 Aug 27 07:08 libjvm.so - /usr/lib/jvm/jdk1.6.0_45/jre/lib/amd64/server/libjvm.so lrwxrwxrwx. 1 tester tester 25 Aug 27 07:08 libprotobuf-lite.so - libprotobuf-lite.so.8.0.0 lrwxrwxrwx. 1 tester tester 25 Aug 27 07:08 libprotobuf-lite.so.8 - libprotobuf-lite.so.8.0.0 -rwxr-xr-x. 1 tester tester 964689 Aug 27 07:08 libprotobuf-lite.so.8.0.0 lrwxrwxrwx. 1 tester tester 20 Aug 27 07:08 libprotobuf.so - libprotobuf.so.8.0.0 lrwxrwxrwx. 1 tester tester 20 Aug 27 07:08 libprotobuf.so.8 - libprotobuf.so.8.0.0 -rwxr-xr-x. 1 tester tester 8300050 Aug 27 07:08 libprotobuf.so.8.0.0 lrwxrwxrwx. 1 tester tester 18 Aug 27 07:08 libprotoc.so - libprotoc.so.8.0.0 lrwxrwxrwx. 1 tester tester 18 Aug 27 07:08 libprotoc.so.8 - libprotoc.so.8.0.0 -rwxr-xr-x. 1 tester tester 9935810 Aug 27 07:08 libprotoc.so.8.0.0 -rw-r--r--. 1 tester tester 233554 Aug 27 15:19 libsnappy.a lrwxrwxrwx. 1 tester tester 23 Aug 27 11:32 libsnappy.so - /usr/lib64/libsnappy.so lrwxrwxrwx. 1 tester tester 23 Aug 27 11:33 libsnappy.so.1 - /usr/lib64/libsnappy.so -rwxr-xr-x. 1 tester tester 147726 Aug 27 07:08 libsnappy.so.1.2.0 drwxr-xr-x. 2 tester tester 4096 Aug 27 07:08 pkgconfig Regards Arthur On 23 Oct, 2014, at 10:57 am, Shao, Saisai saisai.s...@intel.com wrote: Hi Arthur, I think your problem might be different from what SPARK-3958(https://issues.apache.org/jira/browse/SPARK-3958) mentioned, seems your problem is more likely to be a library link problem, would you mind checking your Spark runtime to see if the snappy.so is loaded or not? (through lsof -p). I guess your problem is more likely to be a library not found problem. Thanks Jerry
Workaround for SPARK-1931 not compiling
Hi all, I am new to spark/graphx and am trying to use partitioning strategies in graphx on spark 1.0.0 The workaround I saw on the main page seems not to compile. The code I added was def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { val numPartitions = edges.partitions.size edges.map(e = (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) .partitionBy(new HashPartitioner(numPartitions)) .mapPartitions(_.map(_._2), preservesPartitioning = true) } val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).cache() val graph = Graph(unpartitionedGraph.vertices, partitionBy(unpartitionedGraph.edges, PartitionStrategy.EdgePartition2D)) The partition by function is the same as the workarounds described in the official documentation I am however getting the following error error: value partitionBy is not a member of org.apache.spark.rdd.RDD[(org.apache.spark.graphx.PartitionID, org.apache.spark.graphx.Edge[ED])] [INFO] possible cause: maybe a semicolon is missing before `value partitionBy'? [INFO] .partitionBy(new HashPartitioner(numPartitions)) [INFO] ^ Please help me in resolving the error. Note: I cant upgrade spark since I am only a client on the spark cluster. -- Arpit Kumar
Re: Spark as key/value store?
Thanks! On Thu, Oct 23, 2014 at 10:56 AM, Akshat Aranya aara...@gmail.com wrote: Yes, that is a downside of Spark's design in general. The only way to share data across consumers of data is by having a separate entity that owns the Spark context. That's the idea behind Ooyala's job server. The driver is still a single point of failure; if you lose the driver process, you lose all information about the RDDs. On Oct 22, 2014 6:33 PM, Hajime Takase placeofnomemor...@gmail.com wrote: Interesting.I see the interface of IndexedRDD, which seems to be like key/value store of the certain SparkContext. https://github.com/apache/spark/pull/1297 But the different SparkContext won't let their IndexedRDD to be used by other ( I want to use multiple driver in my system)? On Thu, Oct 23, 2014 at 1:01 AM, Akshat Aranya aara...@gmail.com wrote: Spark, in general, is good for iterating through an entire dataset again and again. All operations are expressed in terms of iteration through all the records of at least one partition. You may want to look at IndexedRDD ( https://issues.apache.org/jira/browse/SPARK-2365) that aims to improve point queries. In general though, Spark is unlikely to outperform KV stores because of the nature of scheduling a job for every operation. On Wed, Oct 22, 2014 at 7:51 AM, Hajime Takase placeofnomemor...@gmail.com wrote: Hi, Is it possible to use Spark as clustered key/value store ( say, like redis-cluster or hazelcast)?Will it out perform in write/read or other operation? My main urge is to use same RDD from several different SparkContext without saving to disk or using spark-job server,but I'm curious if someone has already tried using Spark like key/value store. Thanks, Hajime
RE: Spark Hive Snappy Error
Seems you just add snappy library into your classpath: export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar But for spark itself, it depends on snappy-0.2.jar. Is there any possibility that this problem caused by different version of snappy? Thanks Jerry From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] Sent: Thursday, October 23, 2014 11:32 AM To: Shao, Saisai Cc: arthur.hk.c...@gmail.com; user Subject: Re: Spark Hive Snappy Error Hi, Please find the attached file. my spark-default.xml # Default system properties included when running spark-submit. # This is useful for setting default environmental settings. # # Example: # spark.masterspark://master:7077 # spark.eventLog.enabled true # spark.eventLog.dir hdfs://namenode:8021/directory # spark.serializerorg.apache.spark.serializer.KryoSerializer # spark.executor.memory 2048m spark.shuffle.spill.compressfalse spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec my spark-env.sh #!/usr/bin/env bash export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar export CLASSPATH=$CLASSPATH:$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64 export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop} export SPARK_WORKER_DIR=/edh/hadoop_data/spark_work/ export SPARK_LOG_DIR=/edh/hadoop_logs/spark export SPARK_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64 export SPARK_CLASSPATH=$SPARK_HOME/lib_managed/jars/mysql-connector-java-5.1.31-bin.jar export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HBASE_HOME/lib/*:$HIVE_HOME/csv-serde-1.1.2-0.11.0-all.jar: export SPARK_WORKER_MEMORY=2g export HADOOP_HEAPSIZE=2000 export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=m35:2181,m33:2181,m37:2181 export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC ll $HADOOP_HOME/lib/native/Linux-amd64-64 -rw-rw-r--. 1 tester tester50523 Aug 27 14:12 hadoop-auth-2.4.1.jar -rw-rw-r--. 1 tester tester 1062640 Aug 27 12:19 libhadoop.a -rw-rw-r--. 1 tester tester 1487564 Aug 27 11:14 libhadooppipes.a lrwxrwxrwx. 1 tester tester 24 Aug 27 07:08 libhadoopsnappy.so - libhadoopsnappy.so.0.0.1 lrwxrwxrwx. 1 tester tester 24 Aug 27 07:08 libhadoopsnappy.so.0 - libhadoopsnappy.so.0.0.1 -rwxr-xr-x. 1 tester tester54961 Aug 27 07:08 libhadoopsnappy.so.0.0.1 -rwxrwxr-x. 1 tester tester 630328 Aug 27 12:19 libhadoop.so -rwxrwxr-x. 1 tester tester 630328 Aug 27 12:19 libhadoop.so.1.0.0 -rw-rw-r--. 1 tester tester 582472 Aug 27 11:14 libhadooputils.a -rw-rw-r--. 1 tester tester 298626 Aug 27 11:14 libhdfs.a -rwxrwxr-x. 1 tester tester 200370 Aug 27 11:14 libhdfs.so -rwxrwxr-x. 1 tester tester 200370 Aug 27 11:14 libhdfs.so.0.0.0 lrwxrwxrwx. 1 tester tester 55 Aug 27 07:08 libjvm.so - /usr/lib/jvm/jdk1.6.0_45/jre/lib/amd64/server/libjvm.so lrwxrwxrwx. 1 tester tester 25 Aug 27 07:08 libprotobuf-lite.so - libprotobuf-lite.so.8.0.0 lrwxrwxrwx. 1 tester tester 25 Aug 27 07:08 libprotobuf-lite.so.8 - libprotobuf-lite.so.8.0.0 -rwxr-xr-x. 1 tester tester 964689 Aug 27 07:08 libprotobuf-lite.so.8.0.0 lrwxrwxrwx. 1 tester tester 20 Aug 27 07:08 libprotobuf.so - libprotobuf.so.8.0.0 lrwxrwxrwx. 1 tester tester 20 Aug 27 07:08 libprotobuf.so.8 - libprotobuf.so.8.0.0 -rwxr-xr-x. 1 tester tester 8300050 Aug 27 07:08 libprotobuf.so.8.0.0 lrwxrwxrwx. 1 tester tester 18 Aug 27 07:08 libprotoc.so - libprotoc.so.8.0.0 lrwxrwxrwx. 1 tester tester 18 Aug 27 07:08 libprotoc.so.8 - libprotoc.so.8.0.0 -rwxr-xr-x. 1 tester tester 9935810 Aug 27 07:08 libprotoc.so.8.0.0 -rw-r--r--. 1 tester tester 233554 Aug 27 15:19 libsnappy.a lrwxrwxrwx. 1 tester tester 23 Aug 27 11:32 libsnappy.so - /usr/lib64/libsnappy.so lrwxrwxrwx. 1 tester tester 23 Aug 27 11:33 libsnappy.so.1 - /usr/lib64/libsnappy.so -rwxr-xr-x. 1 tester tester 147726 Aug 27 07:08 libsnappy.so.1.2.0 drwxr-xr-x. 2 tester tester 4096 Aug 27 07:08 pkgconfig Regards Arthur On 23 Oct, 2014, at 10:57 am, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Arthur, I think your problem might be different from what SPARK-3958(https://issues.apache.org/jira/browse/SPARK-3958) mentioned, seems your problem is more likely to be a library link problem, would you mind checking your Spark runtime to see if the snappy.so is loaded or not? (through lsof -p). I guess your problem is more likely to be a library not found problem. Thanks Jerry
scalac crash when compiling DataTypeConversions.scala
I started building Spark / running Spark tests this weekend and on maybe 5-10 occasions have run into a compiler crash while compiling DataTypeConversions.scala. Here https://gist.github.com/ryan-williams/7673d7da928570907f4d is a full gist of an innocuous test command (mvn test -Dsuites='*KafkaStreamSuite') exhibiting this behavior. Problem starts on L512 https://gist.github.com/ryan-williams/7673d7da928570907f4d#file-stdout-L512 and there’s a final stack trace at the bottom https://gist.github.com/ryan-williams/7673d7da928570907f4d#file-stdout-L671 . mvn clean or ./sbt/sbt clean “fix” it (I believe I’ve observed the issue while compiling with each tool), but are annoying/time-consuming to do, obvs, and it’s happening pretty frequently for me when doing only small numbers of incremental compiles punctuated by e.g. checking out different git commits. Have other people seen this? This post http://apache-spark-user-list.1001560.n3.nabble.com/spark-github-source-build-error-td10532.html on this list is basically the same error, but in TestSQLContext.scala and this SO post http://stackoverflow.com/questions/25211071/compilation-errors-in-spark-datatypeconversions-scala-on-intellij-when-using-m claims to be hitting it when trying to build in intellij. It seems likely to be a bug in scalac; would finding a consistent repro case and filing it somewhere be useful?
About Memory usage in the Spark UI
Hi, please take a look at the attached screen-shot. I wonders what's the Memory Used column mean. I give 2GB memory to the driver process and 12GB memory to the executor process. Thank you!
how to run a dev spark project without fully rebuilding the fat jar ?
during tests, I often modify my code a little bit and want to see the result. but spark-submit requires the full fat-jar, which takes quite a lot of time to build. I just need to run in --master local mode. is there a way to run it without rebuilding the fat jar? thanks Yang
Re: spark 1.1.0/yarn hang
We have narrowed this hanging issue down to the calliope package that we used to create RDD from reading cassandra table. The calliope native RDD interface seems hanging and I have decided to switch to the calliope cql3 RDD interface. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-0-yarn-hang-tp16396p17087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to run a dev spark project without fully rebuilding the fat jar ?
i think you can give a list of jars - not just one - to spark-submit, so build only the one that has changed source code. On Wed, Oct 22, 2014 at 10:29 PM, Yang tedd...@gmail.com wrote: during tests, I often modify my code a little bit and want to see the result. but spark-submit requires the full fat-jar, which takes quite a lot of time to build. I just need to run in --master local mode. is there a way to run it without rebuilding the fat jar? thanks Yang