Re: Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
Adding a hadoop-2.6 profile is not necessary. Use hadoop-2.4, which already exists and is intended for 2.4+. In fact this declaration is missing things that Hadoop 2 needs. On Thu, Dec 18, 2014 at 3:46 AM, Kyle Lin kylelin2...@gmail.com wrote: Hi there The following is my steps. And got the same exception with Daniel's. Another question: how can I build a tgz file like the pre-build file I download from official website? 1. download trunk from git. 2. add following lines in pom.xml + profile + idhadoop-2.6/id + properties +hadoop.version2.6.0/hadoop.version +protobuf.version2.5.0/protobuf.version +jets3t.version0.9.0/jets3t.version + /properties +/profile 3. run mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package 4. in $SPARK_HOME, run following command ./bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi lib/spark-examples*.jar 10 Kyle 2014-12-18 2:24 GMT+08:00 Daniel Haviv danielru...@gmail.com: Thanks for your replies. I was building spark from trunk. Daniel On 17 בדצמ׳ 2014, at 19:49, Nicholas Chammas nicholas.cham...@gmail.com wrote: Thanks for the correction, Sean. Do the docs need to be updated on this point, or is it safer for now just to note 2.4 specifically? On Wed Dec 17 2014 at 5:54:53 AM Sean Owen so...@cloudera.com wrote: Spark works fine with 2.4 *and later*. The docs don't mean to imply 2.4 is the last supported version. On Wed, Dec 17, 2014 at 10:19 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Spark 1.3 does not exist. Spark 1.2 hasn't been released just yet. Which version of Spark did you mean? Also, from what I can see in the docs, I believe the latest version of Hadoop that Spark supports is 2.4, not 2.6. Nick On Wed Dec 17 2014 at 2:09:56 AM Kyle Lin kylelin2...@gmail.com wrote: I also got the same problem.. 2014-12-09 22:58 GMT+08:00 Daniel Haviv danielru...@gmail.com: Hi, I've built spark 1.3 with hadoop 2.6 but when I startup the spark-shell I get the following exception: 14/12/09 06:54:24 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/12/09 06:54:24 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at http://hdname:4040 14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service address: http://0.0.0.0:8188/ws/v1/timeline/ java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) Any idea why ? Thanks, Daniel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can Spark 1.0.2 run on CDH-4.3.0 with yarn? And Will Spark 1.2.0 support CDH5.1.2 with yarn?
I did not compile spark 1.1.0 source code on CDH4.3.0 with yarn successfully. Does it support CDH4.3.0 with yarn ? And will spark 1.2.0 support CDH5.1.2? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-Spark-1-0-2-run-on-CDH-4-3-0-with-yarn-And-Will-Spark-1-2-0-support-CDH5-1-2-with-yarn-tp20760.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary
Well, it's always a good idea to used matched binary versions. Here it is more acutely necessary. You can use a pre built binary -- if you use it to compile and also run. Why does it not make sense to publish artifacts? Not sure what you mean about core vs assembly, as the assembly contains all of the modules. You don't literally need the same jar file. On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui rui@intel.com wrote: Not using spark-submit. The App directly communicates with the Spark cluster in standalone mode. If mark the Spark dependency as 'provided’, then the spark-core .jar elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark binary only has an assembly jar, not having individual module jars. So you don’t have a chance to point to a module.jar which is the same binary as that in the pre-built Spark binary. Maybe the Spark distribution should contain not only the assembly jar but also individual module jars. Any opinion? From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu] Sent: Thursday, December 18, 2014 2:20 AM To: Sean Owen Cc: Sun, Rui; user@spark.apache.org Subject: Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary Just to clarify, are you running the application using spark-submit after packaging with sbt package ? One thing that might help is to mark the Spark dependency as 'provided' as then you shouldn't have the Spark classes in your jar. Thanks Shivaram On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen so...@cloudera.com wrote: You should use the same binaries everywhere. The problem here is that anonymous functions get compiled to different names when you build different (potentially) so you actually have one function being called when another function is meant. On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui rui@intel.com wrote: Hi, I encountered a weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary. Steps to reproduce: 1. Download the official pre-built Spark binary 1.1.1 at http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz 2. Launch the Spark cluster in pseudo cluster mode 3. A small scala APP which calls RDD.saveAsObjectFile() scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.1 ) val sc = new SparkContext(args(0), test) //args[0] is the Spark master URI val rdd = sc.parallelize(List(1, 2, 3)) rdd.saveAsObjectFile(/tmp/mysaoftmp) sc.stop throws an exception as follows: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com): java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [error] org.apache.spark.scheduler.Task.run(Task.scala:54) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) [error] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) [error] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [error] java.lang.Thread.run(Thread.java:701) After investigation, I found that this is caused by bytecode incompatibility issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built spark assembly respectively. This issue also happens with spark 1.1.0. Is there anything wrong in my usage of Spark? Or anything wrong in the process of deploying Spark module jars to maven repo? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary
@Rui do you mean the spark-core jar in the maven central repo are incompatible with the same version of the the official pre-built Spark binary? That's really weird. I thought they should have used the same codes. Best Regards, Shixiong Zhu 2014-12-18 17:22 GMT+08:00 Sean Owen so...@cloudera.com: Well, it's always a good idea to used matched binary versions. Here it is more acutely necessary. You can use a pre built binary -- if you use it to compile and also run. Why does it not make sense to publish artifacts? Not sure what you mean about core vs assembly, as the assembly contains all of the modules. You don't literally need the same jar file. On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui rui@intel.com wrote: Not using spark-submit. The App directly communicates with the Spark cluster in standalone mode. If mark the Spark dependency as 'provided’, then the spark-core .jar elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark binary only has an assembly jar, not having individual module jars. So you don’t have a chance to point to a module.jar which is the same binary as that in the pre-built Spark binary. Maybe the Spark distribution should contain not only the assembly jar but also individual module jars. Any opinion? From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu] Sent: Thursday, December 18, 2014 2:20 AM To: Sean Owen Cc: Sun, Rui; user@spark.apache.org Subject: Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary Just to clarify, are you running the application using spark-submit after packaging with sbt package ? One thing that might help is to mark the Spark dependency as 'provided' as then you shouldn't have the Spark classes in your jar. Thanks Shivaram On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen so...@cloudera.com wrote: You should use the same binaries everywhere. The problem here is that anonymous functions get compiled to different names when you build different (potentially) so you actually have one function being called when another function is meant. On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui rui@intel.com wrote: Hi, I encountered a weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary. Steps to reproduce: 1. Download the official pre-built Spark binary 1.1.1 at http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz 2. Launch the Spark cluster in pseudo cluster mode 3. A small scala APP which calls RDD.saveAsObjectFile() scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.1 ) val sc = new SparkContext(args(0), test) //args[0] is the Spark master URI val rdd = sc.parallelize(List(1, 2, 3)) rdd.saveAsObjectFile(/tmp/mysaoftmp) sc.stop throws an exception as follows: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com): java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [error] org.apache.spark.scheduler.Task.run(Task.scala:54) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) [error] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) [error] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [error] java.lang.Thread.run(Thread.java:701) After investigation, I found that this is caused by bytecode incompatibility issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built spark assembly respectively. This issue also happens with spark 1.1.0. Is there anything wrong in my usage of Spark? Or anything wrong in the process of deploying Spark module jars to maven repo? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: Can Spark 1.0.2 run on CDH-4.3.0 with yarn? And Will Spark 1.2.0 support CDH5.1.2 with yarn?
The question is really: will Spark 1.1 work with a particular version of YARN? many, but not all versions of YARN are supported. The stable versions are (2.2.x+). Before that, support is patchier, and in fact has been removed in Spark 1.3. The yarn profile supports YARN stable which is about 2.2.x and onwards. The yarn-alpha profile should work for YARN about 0.23.x. 2.0.x and 2.1.x were a sort of beta period and I recall that yarn-alpha works with some of it, but not all, and there is no yarn-beta profile. I believe early CDH 4.x has basically YARN beta. Later 4.x has stable. I think I'd try the yarn-alpha profile and see if it compiles. But the version of YARN in that release may well be among those that fall in the gap between alpha and stable support. Thankfully things got a lot more stable past Hadoop / YARN 2.2 or so, so it far more just works without version issues. And CDH 5 is based on Hadoop 2.3 and then 2.5, so you should be much more able to build whatever versions together that you want. CDH 5.1.x ships Spark 1.0.x. There should be no problem using 1.1.x, 1.2.x, etc. with it; you just need to make and support your own binaries. 5.2.x has 1.1.x; 5.3.x will have 1.2.x. On Thu, Dec 18, 2014 at 9:18 AM, Canoe canoe...@gmail.com wrote: I did not compile spark 1.1.0 source code on CDH4.3.0 with yarn successfully. Does it support CDH4.3.0 with yarn ? And will spark 1.2.0 support CDH5.1.2? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-Spark-1-0-2-run-on-CDH-4-3-0-with-yarn-And-Will-Spark-1-2-0-support-CDH5-1-2-with-yarn-tp20760.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Semantics of foreachPartition()
Hi, I have the following code in my application: tmpRdd.foreach(item = { println(abc: + item) }) tmpRdd.foreachPartition(iter = { iter.map(item = { println(xyz: + item) }) }) In the output, I see only the abc prints (i.e. from the foreach() call). (The result is the same also if I exchange the order.) What exactly is the meaning of foreachPartition and how would I use it correctly? Thanks Tobias
Re: Can Spark 1.0.2 run on CDH-4.3.0 with yarn? And Will Spark 1.2.0 support CDH5.1.2 with yarn?
Hi, Sean Thank you for your reply. I will try to use Spark 1.1 and 1.2 on CHD5.X. :) 2014-12-18 17:38 GMT+08:00 Sean Owen so...@cloudera.com: The question is really: will Spark 1.1 work with a particular version of YARN? many, but not all versions of YARN are supported. The stable versions are (2.2.x+). Before that, support is patchier, and in fact has been removed in Spark 1.3. The yarn profile supports YARN stable which is about 2.2.x and onwards. The yarn-alpha profile should work for YARN about 0.23.x. 2.0.x and 2.1.x were a sort of beta period and I recall that yarn-alpha works with some of it, but not all, and there is no yarn-beta profile. I believe early CDH 4.x has basically YARN beta. Later 4.x has stable. I think I'd try the yarn-alpha profile and see if it compiles. But the version of YARN in that release may well be among those that fall in the gap between alpha and stable support. Thankfully things got a lot more stable past Hadoop / YARN 2.2 or so, so it far more just works without version issues. And CDH 5 is based on Hadoop 2.3 and then 2.5, so you should be much more able to build whatever versions together that you want. CDH 5.1.x ships Spark 1.0.x. There should be no problem using 1.1.x, 1.2.x, etc. with it; you just need to make and support your own binaries. 5.2.x has 1.1.x; 5.3.x will have 1.2.x. On Thu, Dec 18, 2014 at 9:18 AM, Canoe canoe...@gmail.com wrote: I did not compile spark 1.1.0 source code on CDH4.3.0 with yarn successfully. Does it support CDH4.3.0 with yarn ? And will spark 1.2.0 support CDH5.1.2? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-Spark-1-0-2-run-on-CDH-4-3-0-with-yarn-And-Will-Spark-1-2-0-support-CDH5-1-2-with-yarn-tp20760.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- 谁谓河广,一苇航之
Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary
Have a look at https://issues.apache.org/jira/browse/SPARK-2075 It's not quite that the API is different, but indeed building different 'flavors' of the same version (hadoop1 vs 2) can strangely lead to this problem, even though the public API is identical and in theory the API is completely separate from the backend bindings. IIRC the idea is that only submitting via spark-submit is really supported, because there you're definitely running exactly what's on your cluster. That should always work. This sort of gotcha turns up in some specific cases but you can always work around it by matching your embedded Spark version as well. On Thu, Dec 18, 2014 at 9:38 AM, Shixiong Zhu zsxw...@gmail.com wrote: @Rui do you mean the spark-core jar in the maven central repo are incompatible with the same version of the the official pre-built Spark binary? That's really weird. I thought they should have used the same codes. Best Regards, Shixiong Zhu 2014-12-18 17:22 GMT+08:00 Sean Owen so...@cloudera.com: Well, it's always a good idea to used matched binary versions. Here it is more acutely necessary. You can use a pre built binary -- if you use it to compile and also run. Why does it not make sense to publish artifacts? Not sure what you mean about core vs assembly, as the assembly contains all of the modules. You don't literally need the same jar file. On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui rui@intel.com wrote: Not using spark-submit. The App directly communicates with the Spark cluster in standalone mode. If mark the Spark dependency as 'provided’, then the spark-core .jar elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark binary only has an assembly jar, not having individual module jars. So you don’t have a chance to point to a module.jar which is the same binary as that in the pre-built Spark binary. Maybe the Spark distribution should contain not only the assembly jar but also individual module jars. Any opinion? From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu] Sent: Thursday, December 18, 2014 2:20 AM To: Sean Owen Cc: Sun, Rui; user@spark.apache.org Subject: Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary Just to clarify, are you running the application using spark-submit after packaging with sbt package ? One thing that might help is to mark the Spark dependency as 'provided' as then you shouldn't have the Spark classes in your jar. Thanks Shivaram On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen so...@cloudera.com wrote: You should use the same binaries everywhere. The problem here is that anonymous functions get compiled to different names when you build different (potentially) so you actually have one function being called when another function is meant. On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui rui@intel.com wrote: Hi, I encountered a weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary. Steps to reproduce: 1. Download the official pre-built Spark binary 1.1.1 at http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz 2. Launch the Spark cluster in pseudo cluster mode 3. A small scala APP which calls RDD.saveAsObjectFile() scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.1 ) val sc = new SparkContext(args(0), test) //args[0] is the Spark master URI val rdd = sc.parallelize(List(1, 2, 3)) rdd.saveAsObjectFile(/tmp/mysaoftmp) sc.stop throws an exception as follows: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com): java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [error] org.apache.spark.scheduler.Task.run(Task.scala:54) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) [error]
Re: Semantics of foreachPartition()
Hi again, On Thu, Dec 18, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: tmpRdd.foreachPartition(iter = { iter.map(item = { println(xyz: + item) }) }) Uh, with iter.foreach(...) it works... the reason being apparently that iter.map() returns itself an iterator, is thus evaluated lazily (in this case: never), while iter.foreach() is evaluated immediately. Thanks Tobias
Re: java.io.NotSerializableException: org.apache.avro.mapred.AvroKey using spark with avro
I did not encounter this with my Avro records using Spark 1.10 (see https://github.com/medale/spark-mail/blob/master/analytics/src/main/scala/com/uebercomputing/analytics/basic/UniqueSenderCounter.scala). I do use the default Java serialization but all the fields in my Avro object are Serializable (no bytes/ByteBuffer). Does your Avro schema use bytes? If so, it seems that is wrapped in ByteBuffer, which is not Serializable. A quick search has a fix here: https://groups.google.com/forum/#!topic/spark-users/6HQPuxsCe0c Hope this helps, Markus On 12/17/2014 08:14 PM, touchdown wrote: Yeah, I have the same problem with 1.1.0, but not 1.0.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-NotSerializableException-org-apache-avro-mapred-AvroKey-using-spark-with-avro-tp15165p20752.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Providing query dsl to Elasticsearch for Spark (2.1.0.Beta3)
Quick follow-up: this works sweetly with spark-1.1.1-bin-hadoop2.4. On Dec 3, 2014, at 3:31 PM, Ian Wilkinson ia...@me.com wrote: Hi, I'm trying the Elasticsearch support for Spark (2.1.0.Beta3). In the following I provide the query (as query dsl): import org.elasticsearch.spark._ object TryES { val sparkConf = new SparkConf().setAppName(Campaigns) sparkConf.set(es.nodes, es_cluster:9200) sparkConf.set(es.nodes.discovery, false) val sc = new SparkContext(sparkConf) def main(args: Array[String]) { val query = { query: { ... } } val campaigns = sc.esRDD(resource, query) campaigns.count(); } } However when I submit this (using spark-1.1.0-bin-hadoop2.4), I am experiencing the following exceptions: 14/12/03 14:55:27 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/03 14:55:27 INFO scheduler.DAGScheduler: Failed to run count at TryES.scala:... Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot open stream for resource { query: { ... } } Is the query dsl supported with esRDD, or am I missing something more fundamental? Huge thanks, ian - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK-2243 Support multiple SparkContexts in the same JVM
Yes, although once you have multiple ClassLoaders, you are operating as if in multiple JVMs for most intents and purposes. I think the request for this kind of functionality comes from use cases where multiple ClassLoaders wouldn't work, like, wanting to have one app (in one ClassLoader) managing multiple contexts. On Thu, Dec 18, 2014 at 2:23 AM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, First comment on the issue says that reason for non-supporting of multiple contexts is “There are numerous assumptions in the code base that uses a shared cache or thread local variables or some global identifiers which prevent us from using multiple SparkContext's.” May it be worked around by creating those context in several classloaders with their own copies of Spark classes? Thanks, Anton - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Incorrect results when calling collect() ?
Hi, I’m getting some seemingly invalid results when I collect an RDD. This is happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac. See the following code snippet: JavaRDDThing rdd= pairRDD.values(); rdd.foreach( e - System.out.println ( RDD Foreach: + e ) ); rdd.collect().forEach( e - System.out.println ( Collected Foreach: + e ) ); I would expect the results from the two outputters to be identical, but instead I see: RDD Foreach: Thing1 RDD Foreach: Thing2 RDD Foreach: Thing3 RDD Foreach: Thing4 (…snip…) Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing2 So essentially the valid entries except for one are replaced by an equivalent number of duplicate objects. I’ve tried various map and filter operations, but the results in the RDD always appear correct until I try to collect() the results. I’ve also found that calling cache() on the RDD materialises the duplication process such that the RDD Foreach displays the duplicates too... Any suggestions for how I can go about debugging this would be massively appreciated. Cheers Tristan
Can we specify driver running on a specific machine of the cluster on yarn-cluster mode?
Hi all,On yarn-cluster mode, can we let the driver running on a specific machine that we choose in cluster ? Or, even the machine not in the cluster?
Re: Implementing a spark version of Haskell's partition
NP man, The thing is that since you're in a dist env, it'd be cumbersome to do that. Remember that Spark works basically on block/partition, they are the unit of distribution and parallelization. That means that actions have to be run against it **after having been scheduled on the cluster**. The latter point is the most important, it means that the RDD aren't really created on the driver the collection is created/transformed/... on the partition. Consequence of what you cannot, on the driver, create such representation on the distributed collection since you haven't seen it yet. That being said, you can only prepare/define some computations on the driver that will segregate the data by applying a filter on the nodes. If you want to keep RDD operators as they are, yes you'll need to pass over the distributed data twice. The option of using the mapPartitions for instance, will be to create a RDD[Seq[A], Seq[A]] however it's going to be tricky because you'll might have to repartition otherwise the OOMs might blow at your face :-D. I won't pick that one! A final note: looping over the data is not that a problem (specially if you can cache it), and in fact it's way better to keep advantage of resilience etc etc that comes with Spark. my2c andy On Wed Dec 17 2014 at 7:07:05 PM Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi Andy, thanks for your response. I already thought about filtering twice, that was what I meant with that would be equivalent to applying filter twice, but I was thinking if I could do it in a single pass, so that could be later generalized to an arbitrary numbers of classes. I would also like to be able to generate RDDs instead of partitions of a single RDD, so I could use RDD methods like stats() on the fragments. But I think there is currently no RDD method that returns more than one RDD for a single input RDD, so maybe there is some design limitation on Spark that prevents this? Again, thanks for your answer. Greetings, Juan El 17/12/2014 18:15, andy petrella andy.petre...@gmail.com escribió: yo, First, here is the scala version: http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@partition(p:A= Boolean):(Repr,Repr) Second: RDD is distributed so what you'll have to do is to partition each partition each partition (:-D) or create two RDDs with by filtering twice → hence tasks will be scheduled distinctly, and data read twice. Choose what's best for you! hth, andy On Wed Dec 17 2014 at 5:57:56 PM Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi all, I would like to be able to split a RDD in two pieces according to a predicate. That would be equivalent to applying filter twice, with the predicate and its complement, which is also similar to Haskell's partition list function ( http://hackage.haskell.org/package/base-4.7.0.1/docs/Data-List.html). There is currently any way to do this in Spark?, or maybe anyone has a suggestion about how to implent this by modifying the Spark source. I think this is valuable because sometimes I need to split a RDD in several groups that are too big to fit in the memory of a single thread, so pair RDDs are not solution for those cases. A generalization to n parts of Haskell's partition would do the job. Thanks a lot for your help. Greetings, Juan Rodriguez
Re: Incorrect results when calling collect() ?
It sounds a lot like your values are mutable classes and you are mutating or reusing them somewhere? It might work until you actually try to materialize them all and find many point to the same object. On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi, I’m getting some seemingly invalid results when I collect an RDD. This is happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac. See the following code snippet: JavaRDDThing rdd= pairRDD.values(); rdd.foreach( e - System.out.println ( RDD Foreach: + e ) ); rdd.collect().forEach( e - System.out.println ( Collected Foreach: + e ) ); I would expect the results from the two outputters to be identical, but instead I see: RDD Foreach: Thing1 RDD Foreach: Thing2 RDD Foreach: Thing3 RDD Foreach: Thing4 (…snip…) Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing2 So essentially the valid entries except for one are replaced by an equivalent number of duplicate objects. I’ve tried various map and filter operations, but the results in the RDD always appear correct until I try to collect() the results. I’ve also found that calling cache() on the RDD materialises the duplication process such that the RDD Foreach displays the duplicates too... Any suggestions for how I can go about debugging this would be massively appreciated. Cheers Tristan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Implementing a spark version of Haskell's partition
Hi Andy, Thanks again for your thoughts on this, I haven't found much information about the internals of Spark, so I find very useful and interesting these kind of explanations about its low level mechanisms. It's also nice to know that the two pass approach is a viable solution. Regards, Juan 2014-12-18 11:10 GMT+01:00 andy petrella andy.petre...@gmail.com: NP man, The thing is that since you're in a dist env, it'd be cumbersome to do that. Remember that Spark works basically on block/partition, they are the unit of distribution and parallelization. That means that actions have to be run against it **after having been scheduled on the cluster**. The latter point is the most important, it means that the RDD aren't really created on the driver the collection is created/transformed/... on the partition. Consequence of what you cannot, on the driver, create such representation on the distributed collection since you haven't seen it yet. That being said, you can only prepare/define some computations on the driver that will segregate the data by applying a filter on the nodes. If you want to keep RDD operators as they are, yes you'll need to pass over the distributed data twice. The option of using the mapPartitions for instance, will be to create a RDD[Seq[A], Seq[A]] however it's going to be tricky because you'll might have to repartition otherwise the OOMs might blow at your face :-D. I won't pick that one! A final note: looping over the data is not that a problem (specially if you can cache it), and in fact it's way better to keep advantage of resilience etc etc that comes with Spark. my2c andy On Wed Dec 17 2014 at 7:07:05 PM Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi Andy, thanks for your response. I already thought about filtering twice, that was what I meant with that would be equivalent to applying filter twice, but I was thinking if I could do it in a single pass, so that could be later generalized to an arbitrary numbers of classes. I would also like to be able to generate RDDs instead of partitions of a single RDD, so I could use RDD methods like stats() on the fragments. But I think there is currently no RDD method that returns more than one RDD for a single input RDD, so maybe there is some design limitation on Spark that prevents this? Again, thanks for your answer. Greetings, Juan El 17/12/2014 18:15, andy petrella andy.petre...@gmail.com escribió: yo, First, here is the scala version: http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@partition(p:A= Boolean):(Repr,Repr) Second: RDD is distributed so what you'll have to do is to partition each partition each partition (:-D) or create two RDDs with by filtering twice → hence tasks will be scheduled distinctly, and data read twice. Choose what's best for you! hth, andy On Wed Dec 17 2014 at 5:57:56 PM Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi all, I would like to be able to split a RDD in two pieces according to a predicate. That would be equivalent to applying filter twice, with the predicate and its complement, which is also similar to Haskell's partition list function ( http://hackage.haskell.org/package/base-4.7.0.1/docs/Data-List.html). There is currently any way to do this in Spark?, or maybe anyone has a suggestion about how to implent this by modifying the Spark source. I think this is valuable because sometimes I need to split a RDD in several groups that are too big to fit in the memory of a single thread, so pair RDDs are not solution for those cases. A generalization to n parts of Haskell's partition would do the job. Thanks a lot for your help. Greetings, Juan Rodriguez
Re: Help with updateStateByKey
Another point to start playing with updateStateByKey is the example StatefulNetworkWordCount. See the streaming examples directory in the Spark repository. TD On Thu, Dec 18, 2014 at 6:07 AM, Pierce Lamb richard.pierce.l...@gmail.com wrote: I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to sessionize the page time. I believe the operator to make that happen is updateStateByKey. I'm having some trouble with this operator (I'm new to both Spark Scala); any help is appreciated. Thus far: val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) def updateGroupByKey( a: Seq[(String, ArrayBuffer[(String, Long, Long)])], b: Option[(String, ArrayBuffer[(String, Long, Long)])] ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Python APIs?
A more updated version of the streaming programming guide is here http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html Please refer to this until we make the official release of Spark 1.2 TD On Tue, Dec 16, 2014 at 3:50 PM, smallmonkey...@hotmail.com smallmonkey...@hotmail.com wrote: Hi zhu: maybe there is not the python api for spark-stream baishuo smallmonkey...@hotmail.com From: Xiaoyong Zhu Date: 2014-12-15 10:52 To: user@spark.apache.org Subject: Spark Streaming Python APIs? Hi spark experts Are there any Python APIs for Spark Streaming? I didn’t find the Python APIs in Spark Streaming programming guide.. http://spark.apache.org/docs/latest/streaming-programming-guide.html Xiaoyong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Incorrect results when calling collect() ?
Suspected the same thing, but because the underlying data classes are deserialised by Avro I think they have to be mutable as you need to provide the no-args constructor with settable fields. Nothing is being cached in my code anywhere, and this can be reproduced using data directly out of the newAPIHadoopRDD() call. Debugs added to the constructors of the various classes show that the right number are being constructed, though the watches set on some of the fields aren’t always triggering, so suspect maybe the serialisation is doing something a bit too clever? Tristan On 18 December 2014 at 21:25, Sean Owen so...@cloudera.com wrote: It sounds a lot like your values are mutable classes and you are mutating or reusing them somewhere? It might work until you actually try to materialize them all and find many point to the same object. On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi, I’m getting some seemingly invalid results when I collect an RDD. This is happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac. See the following code snippet: JavaRDDThing rdd= pairRDD.values(); rdd.foreach( e - System.out.println ( RDD Foreach: + e ) ); rdd.collect().forEach( e - System.out.println ( Collected Foreach: + e ) ); I would expect the results from the two outputters to be identical, but instead I see: RDD Foreach: Thing1 RDD Foreach: Thing2 RDD Foreach: Thing3 RDD Foreach: Thing4 (…snip…) Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing2 So essentially the valid entries except for one are replaced by an equivalent number of duplicate objects. I’ve tried various map and filter operations, but the results in the RDD always appear correct until I try to collect() the results. I’ve also found that calling cache() on the RDD materialises the duplication process such that the RDD Foreach displays the duplicates too... Any suggestions for how I can go about debugging this would be massively appreciated. Cheers Tristan
Re: Incorrect results when calling collect() ?
Being mutable is fine; reusing and mutating the objects is the issue. And yes the objects you get back from Hadoop are reused by Hadoop InputFormats. You should just map the objects to a clone before using them where you need them to exist all independently at once, like before a collect(). (That said... generally speaking collect() involves copying from workers to the driver, which necessarily means a copy anyway. I suspect this isn't working that way for you since you're running it all locally?) On Thu, Dec 18, 2014 at 10:42 AM, Tristan Blakers tris...@blackfrog.org wrote: Suspected the same thing, but because the underlying data classes are deserialised by Avro I think they have to be mutable as you need to provide the no-args constructor with settable fields. Nothing is being cached in my code anywhere, and this can be reproduced using data directly out of the newAPIHadoopRDD() call. Debugs added to the constructors of the various classes show that the right number are being constructed, though the watches set on some of the fields aren’t always triggering, so suspect maybe the serialisation is doing something a bit too clever? Tristan On 18 December 2014 at 21:25, Sean Owen so...@cloudera.com wrote: It sounds a lot like your values are mutable classes and you are mutating or reusing them somewhere? It might work until you actually try to materialize them all and find many point to the same object. On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi, I’m getting some seemingly invalid results when I collect an RDD. This is happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac. See the following code snippet: JavaRDDThing rdd= pairRDD.values(); rdd.foreach( e - System.out.println ( RDD Foreach: + e ) ); rdd.collect().forEach( e - System.out.println ( Collected Foreach: + e ) ); I would expect the results from the two outputters to be identical, but instead I see: RDD Foreach: Thing1 RDD Foreach: Thing2 RDD Foreach: Thing3 RDD Foreach: Thing4 (…snip…) Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing2 So essentially the valid entries except for one are replaced by an equivalent number of duplicate objects. I’ve tried various map and filter operations, but the results in the RDD always appear correct until I try to collect() the results. I’ve also found that calling cache() on the RDD materialises the duplication process such that the RDD Foreach displays the duplicates too... Any suggestions for how I can go about debugging this would be massively appreciated. Cheers Tristan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
create table in yarn-cluster mode vs yarn-client mode
Hi, I have a simple app, where I am trying to create a table. I am able to create the table on running app in yarn-client mode, but not with yarn-cluster mode. Is this some known issue? Has this already been fixed? Please note that I am using spark-1.1 over hadoop-2.4.0 App: - import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.hive.HiveContext object HiveSpark { case class Record(key: Int, value: String) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HiveSpark) val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) import hiveContext._ hql(use ttt) hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) sc.stop() } } Thanks, Chirag
RE: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary
Owen, Since we have individual module jars published into the central maven repo for an official release, then we need to make sure the official Spark assembly jar should be assembled exactly from these jars, so there will be no binary compatibility issue. We can also publish the official assembly jar to maven for convenience. I doubt there is some mistake in the release procedure for an official release. Yes, you are correct : the assembly contains all of the modules:) But I am not sure if the app want to build itself as an assembly including the dependent modules, can it do in such case? -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, December 18, 2014 5:23 PM To: Sun, Rui Cc: shiva...@eecs.berkeley.edu; user@spark.apache.org Subject: Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary Well, it's always a good idea to used matched binary versions. Here it is more acutely necessary. You can use a pre built binary -- if you use it to compile and also run. Why does it not make sense to publish artifacts? Not sure what you mean about core vs assembly, as the assembly contains all of the modules. You don't literally need the same jar file. On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui rui@intel.com wrote: Not using spark-submit. The App directly communicates with the Spark cluster in standalone mode. If mark the Spark dependency as 'provided’, then the spark-core .jar elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark binary only has an assembly jar, not having individual module jars. So you don’t have a chance to point to a module.jar which is the same binary as that in the pre-built Spark binary. Maybe the Spark distribution should contain not only the assembly jar but also individual module jars. Any opinion? From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu] Sent: Thursday, December 18, 2014 2:20 AM To: Sean Owen Cc: Sun, Rui; user@spark.apache.org Subject: Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary Just to clarify, are you running the application using spark-submit after packaging with sbt package ? One thing that might help is to mark the Spark dependency as 'provided' as then you shouldn't have the Spark classes in your jar. Thanks Shivaram On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen so...@cloudera.com wrote: You should use the same binaries everywhere. The problem here is that anonymous functions get compiled to different names when you build different (potentially) so you actually have one function being called when another function is meant. On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui rui@intel.com wrote: Hi, I encountered a weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary. Steps to reproduce: 1. Download the official pre-built Spark binary 1.1.1 at http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz 2. Launch the Spark cluster in pseudo cluster mode 3. A small scala APP which calls RDD.saveAsObjectFile() scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.1 ) val sc = new SparkContext(args(0), test) //args[0] is the Spark master URI val rdd = sc.parallelize(List(1, 2, 3)) rdd.saveAsObjectFile(/tmp/mysaoftmp) sc.stop throws an exception as follows: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com): java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala: 35) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [error] org.apache.spark.scheduler.Task.run(Task.scala:54) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) [error] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1146) [error] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:615) [error] java.lang.Thread.run(Thread.java:701) After investigation, I found that this
Re: java.io.NotSerializableException: org.apache.avro.mapred.AvroKey using spark with avro
Hi, I had the same problem. One option (starting with Spark 1.2, which is currently in preview) is to use the Avro library for Spark SQL. Other is using Kryo Serialization. by default spark uses Java Serialization, you can specify kryo serialization while creating spark context. val conf = new SparkConf().set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val sc = new SparkContext(conf) This worked for me. Regards, Anish
Re: Incorrect results when calling collect() ?
Recording the outcome here for the record. Based on Sean’s advice I’ve confirmed that making defensive copies of records that will be collected avoids this problem - it does seem like Avro is being a bit too aggressive when deciding it’s safe to reuse an object for a new record. On 18 December 2014 at 21:50, Sean Owen so...@cloudera.com wrote: Being mutable is fine; reusing and mutating the objects is the issue. And yes the objects you get back from Hadoop are reused by Hadoop InputFormats. You should just map the objects to a clone before using them where you need them to exist all independently at once, like before a collect(). (That said... generally speaking collect() involves copying from workers to the driver, which necessarily means a copy anyway. I suspect this isn't working that way for you since you're running it all locally?) On Thu, Dec 18, 2014 at 10:42 AM, Tristan Blakers tris...@blackfrog.org wrote: Suspected the same thing, but because the underlying data classes are deserialised by Avro I think they have to be mutable as you need to provide the no-args constructor with settable fields. Nothing is being cached in my code anywhere, and this can be reproduced using data directly out of the newAPIHadoopRDD() call. Debugs added to the constructors of the various classes show that the right number are being constructed, though the watches set on some of the fields aren’t always triggering, so suspect maybe the serialisation is doing something a bit too clever? Tristan On 18 December 2014 at 21:25, Sean Owen so...@cloudera.com wrote: It sounds a lot like your values are mutable classes and you are mutating or reusing them somewhere? It might work until you actually try to materialize them all and find many point to the same object. On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi, I’m getting some seemingly invalid results when I collect an RDD. This is happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac. See the following code snippet: JavaRDDThing rdd= pairRDD.values(); rdd.foreach( e - System.out.println ( RDD Foreach: + e ) ); rdd.collect().forEach( e - System.out.println ( Collected Foreach: + e ) ); I would expect the results from the two outputters to be identical, but instead I see: RDD Foreach: Thing1 RDD Foreach: Thing2 RDD Foreach: Thing3 RDD Foreach: Thing4 (…snip…) Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing1 Collected Foreach: Thing2 So essentially the valid entries except for one are replaced by an equivalent number of duplicate objects. I’ve tried various map and filter operations, but the results in the RDD always appear correct until I try to collect() the results. I’ve also found that calling cache() on the RDD materialises the duplication process such that the RDD Foreach displays the duplicates too... Any suggestions for how I can go about debugging this would be massively appreciated. Cheers Tristan
Re: No disk single pass RDD aggregation
Hi, This was all my fault. It turned out I had a line of code buried in a library that did a repartition. I used this library to wrap an RDD to present it to legacy code as a different interface. That's what was causing the data to spill to disk. The really stupid thing is it took me the better part of a day to find and several misguided emails to this list (including the one that started this thread). Sorry about that. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20763.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pyspark 1.1.1 on windows saveAsTextFile - NullPointerException
Hi, I'm trying to use pyspark to save a simple rdd to a text file (code below), but it keeps throwing an error. - Python Code - items=[Hello, world] items2 = sc.parallelize(items) items2.coalesce(1).saveAsTextFile('c:/tmp/python_out.csv') - Error --C:\Python27\python.exe C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/12/18 13:00:53 INFO SecurityManager: Changing view acls to: Mark Jones, 14/12/18 13:00:53 INFO SecurityManager: Changing modify acls to: Mark Jones, 14/12/18 13:00:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Mark Jones, ); users with modify permissions: Set(Mark Jones, ) 14/12/18 13:00:53 INFO Slf4jLogger: Slf4jLogger started 14/12/18 13:00:53 INFO Remoting: Starting remoting 14/12/18 13:00:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.19.83:54548] 14/12/18 13:00:53 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.19.83:54548] 14/12/18 13:00:53 INFO Utils: Successfully started service 'sparkDriver' on port 54548. 14/12/18 13:00:53 INFO SparkEnv: Registering MapOutputTracker 14/12/18 13:00:53 INFO SparkEnv: Registering BlockManagerMaster 14/12/18 13:00:53 INFO DiskBlockManager: Created local directory at C:\Users\MARKJO~1\AppData\Local\Temp\spark-local-20141218130053-1ab9 14/12/18 13:00:53 INFO Utils: Successfully started service 'Connection manager for block manager' on port 54551. 14/12/18 13:00:53 INFO ConnectionManager: Bound socket to port 54551 with id = ConnectionManagerId(192.168.19.83,54551) 14/12/18 13:00:53 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 14/12/18 13:00:53 INFO BlockManagerMaster: Trying to register BlockManager 14/12/18 13:00:53 INFO BlockManagerMasterActor: Registering block manager 192.168.19.83:54551 with 265.1 MB RAM 14/12/18 13:00:53 INFO BlockManagerMaster: Registered BlockManager 14/12/18 13:00:53 INFO HttpFileServer: HTTP File server directory is C:\Users\MARKJO~1\AppData\Local\Temp\spark-a43340e8-2621-46b8-a44e-8874dd178393 14/12/18 13:00:53 INFO HttpServer: Starting HTTP Server 14/12/18 13:00:54 INFO Utils: Successfully started service 'HTTP file server' on port 54552. 14/12/18 13:00:54 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/12/18 13:00:54 INFO SparkUI: Started SparkUI at http://192.168.19.83:4040 14/12/18 13:00:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/18 13:00:54 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.clinit(Shell.java:326) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.init(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:36) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala:109) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.scala) at org.apache.spark.SparkContext.init(SparkContext.scala:228) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) 14/12/18 13:00:54 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@192.168.19.83:54548/user/HeartbeatReceiver 14/12/18 13:00:55 INFO
Re: pyspark 1.1.1 on windows saveAsTextFile - NullPointerException
It seems You are missing HADOOP_HOME in the environment. As it says: java.io.IOException: Could not locate executable *null*\bin\winutils.exe in the Hadoop binaries. That null is supposed to be your HADOOP_HOME. Thanks Best Regards On Thu, Dec 18, 2014 at 7:10 PM, mj jone...@gmail.com wrote: Hi, I'm trying to use pyspark to save a simple rdd to a text file (code below), but it keeps throwing an error. - Python Code - items=[Hello, world] items2 = sc.parallelize(items) items2.coalesce(1).saveAsTextFile('c:/tmp/python_out.csv') - Error --C:\Python27\python.exe C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/12/18 13:00:53 INFO SecurityManager: Changing view acls to: Mark Jones, 14/12/18 13:00:53 INFO SecurityManager: Changing modify acls to: Mark Jones, 14/12/18 13:00:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Mark Jones, ); users with modify permissions: Set(Mark Jones, ) 14/12/18 13:00:53 INFO Slf4jLogger: Slf4jLogger started 14/12/18 13:00:53 INFO Remoting: Starting remoting 14/12/18 13:00:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.19.83:54548] 14/12/18 13:00:53 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.19.83:54548] 14/12/18 13:00:53 INFO Utils: Successfully started service 'sparkDriver' on port 54548. 14/12/18 13:00:53 INFO SparkEnv: Registering MapOutputTracker 14/12/18 13:00:53 INFO SparkEnv: Registering BlockManagerMaster 14/12/18 13:00:53 INFO DiskBlockManager: Created local directory at C:\Users\MARKJO~1\AppData\Local\Temp\spark-local-20141218130053-1ab9 14/12/18 13:00:53 INFO Utils: Successfully started service 'Connection manager for block manager' on port 54551. 14/12/18 13:00:53 INFO ConnectionManager: Bound socket to port 54551 with id = ConnectionManagerId(192.168.19.83,54551) 14/12/18 13:00:53 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 14/12/18 13:00:53 INFO BlockManagerMaster: Trying to register BlockManager 14/12/18 13:00:53 INFO BlockManagerMasterActor: Registering block manager 192.168.19.83:54551 with 265.1 MB RAM 14/12/18 13:00:53 INFO BlockManagerMaster: Registered BlockManager 14/12/18 13:00:53 INFO HttpFileServer: HTTP File server directory is C:\Users\MARKJO~1\AppData\Local\Temp\spark-a43340e8-2621-46b8-a44e-8874dd178393 14/12/18 13:00:53 INFO HttpServer: Starting HTTP Server 14/12/18 13:00:54 INFO Utils: Successfully started service 'HTTP file server' on port 54552. 14/12/18 13:00:54 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/12/18 13:00:54 INFO SparkUI: Started SparkUI at http://192.168.19.83:4040 14/12/18 13:00:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/18 13:00:54 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.clinit(Shell.java:326) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.init(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:36) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala:109) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.scala) at org.apache.spark.SparkContext.init(SparkContext.scala:228) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at
Downloads from S3 exceedingly slow when running on spark-ec2
I'm running a very simple Spark application that downloads files from S3, does a bit of mapping, then uploads new files. Each file is roughly 2MB and is gzip'd. I was running the same code on Amazon's EMR w/Spark and not having any download speed issues (Amazon's EMR provides a custom implementation of the s3n:// file system, FWIW). When I say exceedingly slow, I mean that it takes about 2 minutes to download and process a 2MB file (this was taking ~2 seconds on the same instance types in Amazon's EMR). When I download the same file from the EC2 machine with wget or curl, it downloads in ~ 1 second. I've also done other bandwidth checks for downloads from other external hosts - no speed problems there. Tried this w/Spark 1.1.0 and 1.1.1. When I do a thread dump on a worker, I typically see this a lot: Executor task launch worker-7 daemon prio=10 tid=0x7fd174039000 nid=0x59e9 runnable [0x7fd1f7dfb000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) at sun.security.ssl.InputRecord.read(InputRecord.java:480) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) - locked 0x0007e44dd140 (a java.lang.Object) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) at sun.security.ssl.AppInputStream.read(AppInputStream.java:102) - locked 0x0007e44e1350 (a sun.security.ssl.AppInputStream) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read(BufferedInputStream.java:254) - locked 0x0007e44ea800 (a java.io.BufferedInputStream) at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78) at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106) at org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413) at org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973) at org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735) at org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126) at org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256) at org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126) at org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44) at org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99) at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) at
Re: Help with updateStateByKey
Hi Pierce, You shouldn’t have to use groupByKey because updateStateByKey will get a Seq of all the values for that key already. I used that for realtime sessionization as well. What I did was key my incoming events, then send them to udpateStateByKey. The updateStateByKey function then received a Seq of the events and the Option of the previous state for that key. The sessionization code then did its thing to check if the incoming events were part of the same session, based on a configured timeout. If a session already was active (from the previous state) and it hadn’t exceeded the timeout, it used that value. Otherwise it generated a new session id. Then the return value for the updateStateByKey function was a Tuple of session id and last timestamp. Then I joined the DStream with the session ids, which were both keyed off the same id and continued my processing. Your requirements may be different, but that’s what worked well for me. Another thing to consider is cleaning up old sessions by returning None in the updateStateByKey function. This will help with long running apps and minimize memory usage (and checkpoint size). I was using something similar to the method above on a live production stream with very little CPU and memory footprint, running for weeks at a time, processing up to 15M events per day with fluctuating traffic. Thanks, Silvio On 12/17/14, 10:07 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote: I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat ion-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to sessionize the page time. I believe the operator to make that happen is updateStateByKey. I'm having some trouble with this operator (I'm new to both Spark Scala); any help is appreciated. Thus far: val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) def updateGroupByKey( a: Seq[(String, ArrayBuffer[(String, Long, Long)])], b: Option[(String, ArrayBuffer[(String, Long, Long)])] ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.2 Release Date
Is there a planned release date for Spark 1.2? I saw on the Spark Wiki https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage that we are already in the latter part of the release window. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.2 Release Date
It’s on Maven Central already http://search.maven.org/#browse%7C717101892 On 12/18/14, 2:09 PM, Al M alasdair.mcbr...@gmail.com wrote: Is there a planned release date for Spark 1.2? I saw on the Spark Wiki https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage that we are already in the latter part of the release window. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date -tp20765.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
EC2 VPC script
Hi guys. I run the folling command to lauch a new cluster : ./spark-ec2 -k test -i test.pem -s 1 --vpc-id vpc-X --subnet-id subnet-X launch vpc_spark The instances started ok but the command never end. With the following output: Setting up security groups... Searching for existing cluster vpc_spark... Spark AMI: ami-5bb18832 Launching instances... Launched 1 slaves in us-east-1a, regid = r-e9d603c4 Launched master in us-east-1a, regid = r-89d104a4 Waiting for cluster to enter 'ssh-ready' state... any ideas what happend? regards Eduardo
undefined
Hi guys. I run the folling command to lauch a new cluster : ./spark-ec2 -k test -i test.pem -s 1 --vpc-id vpc-X --subnet-id subnet-X launch vpc_spark The instances started ok but the command never end. With the following output: Setting up security groups... Searching for existing cluster vpc_spark... Spark AMI: ami-5bb18832 Launching instances... Launched 1 slaves in us-east-1a, regid = r-e9d603c4 Launched master in us-east-1a, regid = r-89d104a4 Waiting for cluster to enter 'ssh-ready' state... any ideas what happend?
Re: Spark 1.2 Release Date
Soon enough :) http://apache-spark-developers-list.1001551.n3.nabble.com/RESULT-VOTE-Release-Apache-Spark-1-2-0-RC2-td9815.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765p20766.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Effects problems in logistic regression
Hi all!, I have a problem with LogisticRegressionWithSGD, when I train a data set with one variable (wich is a amount of an item) and intercept, I get weights of (-0.4021,-207.1749) for both features, respectively. This don´t make sense to me because I run a logistic regression for the same data in SAS and I get these weights (-2.6604,0.000245). The rank of this variable is from 0 to 59102 with a mean of 1158. The problem is when I want to calculate the probabilities for each user from data set, this probability is near to zero or zero in much cases, because when spark calculates exp(-1*(-0.4021+(-207.1749)*amount)) this is a big number, in fact infinity for spark. How can I treat this variable? or why this happened? Thanks , Franco Barrientos Data Scientist Málaga #115, Of. 1003, Las Condes. Santiago, Chile. (+562)-29699649 (+569)-76347893 mailto:franco.barrien...@exalitica.com franco.barrien...@exalitica.com http://www.exalitica.com/ www.exalitica.com http://exalitica.com/web/img/frim.png
Standalone Spark program
Hi, I am building a Spark-based service which requires initialization of a SparkContext in a main(): def main(args: Array[String]) { val conf = new SparkConf(false) .setMaster(spark://foo.example.com:7077) .setAppName(foobar) val sc = new SparkContext(conf) val rdd = sc.parallelize(0 until 255) val res = rdd.mapPartitions(it = it).take(1) println(sres=$res) sc.stop() } This code works fine via REPL, but not as a standalone program; it causes a ClassNotFoundException. This has me confused about how code is shipped out to executors. When using via REPL, does the mapPartitions closure, it=it, get sent out when the REPL statement is executed? When this code is run as a standalone program (not via spark-submit), is the compiled code expected to be present at the the executor? Thanks, Akshat
Re: Spark 1.2 Release Date
Awesome. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765p20767.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Effects problems in logistic regression
Are you sure this is an apples-to-apples comparison? for example does your SAS process normalize or otherwise transform the data first? Is the optimization configured similarly in both cases -- same regularization, etc.? Are you sure you are pulling out the intercept correctly? It is a separate value from the logistic regression model in Spark. On Thu, Dec 18, 2014 at 4:34 PM, Franco Barrientos franco.barrien...@exalitica.com wrote: Hi all!, I have a problem with LogisticRegressionWithSGD, when I train a data set with one variable (wich is a amount of an item) and intercept, I get weights of (-0.4021,-207.1749) for both features, respectively. This don´t make sense to me because I run a logistic regression for the same data in SAS and I get these weights (-2.6604,0.000245). The rank of this variable is from 0 to 59102 with a mean of 1158. The problem is when I want to calculate the probabilities for each user from data set, this probability is near to zero or zero in much cases, because when spark calculates exp(-1*(-0.4021+(-207.1749)*amount)) this is a big number, in fact infinity for spark. How can I treat this variable? or why this happened? Thanks , *Franco Barrientos* Data Scientist Málaga #115, Of. 1003, Las Condes. Santiago, Chile. (+562)-29699649 (+569)-76347893 franco.barrien...@exalitica.com www.exalitica.com [image: http://exalitica.com/web/img/frim.png]
Re: Standalone Spark program
You can build a jar of your project and add it to the sparkContext (sc.addJar(/path/to/your/project.jar)) then it will get shipped to the worker and hence no classNotfoundException! Thanks Best Regards On Thu, Dec 18, 2014 at 10:06 PM, Akshat Aranya aara...@gmail.com wrote: Hi, I am building a Spark-based service which requires initialization of a SparkContext in a main(): def main(args: Array[String]) { val conf = new SparkConf(false) .setMaster(spark://foo.example.com:7077) .setAppName(foobar) val sc = new SparkContext(conf) val rdd = sc.parallelize(0 until 255) val res = rdd.mapPartitions(it = it).take(1) println(sres=$res) sc.stop() } This code works fine via REPL, but not as a standalone program; it causes a ClassNotFoundException. This has me confused about how code is shipped out to executors. When using via REPL, does the mapPartitions closure, it=it, get sent out when the REPL statement is executed? When this code is run as a standalone program (not via spark-submit), is the compiled code expected to be present at the the executor? Thanks, Akshat
RE: Effects problems in logistic regression
Thanks I will try. De: DB Tsai [mailto:dbt...@dbtsai.com] Enviado el: jueves, 18 de diciembre de 2014 16:24 Para: Franco Barrientos CC: Sean Owen; user@spark.apache.org Asunto: Re: Effects problems in logistic regression Can you try LogisticRegressionWithLBFGS? I verified that this will be converged to the same result trained by R's glmnet package without regularization. The problem of LogisticRegressionWithSGD is it's very slow in term of converging, and lots of time, it's very sensitive to stepsize which can lead to wrong answer. The regularization logic in MLLib is not entirely correct, and it will penalize the intercept. In general, with really high regularization, all the coefficients will be zeros except the intercept. In logistic regression, the non-zero intercept can be understood as the prior-probability of each class, and in linear regression, this will be the mean of response. I'll have a PR to fix this issue. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, Dec 18, 2014 at 10:50 AM, Franco Barrientos franco.barrien...@exalitica.com mailto:franco.barrien...@exalitica.com wrote: Yes, without the “amounts” variables the results are similiar. When I put other variables its fine. De: Sean Owen [mailto:so...@cloudera.com mailto:so...@cloudera.com ] Enviado el: jueves, 18 de diciembre de 2014 14:22 Para: Franco Barrientos CC: user@spark.apache.org mailto:user@spark.apache.org Asunto: Re: Effects problems in logistic regression Are you sure this is an apples-to-apples comparison? for example does your SAS process normalize or otherwise transform the data first? Is the optimization configured similarly in both cases -- same regularization, etc.? Are you sure you are pulling out the intercept correctly? It is a separate value from the logistic regression model in Spark. On Thu, Dec 18, 2014 at 4:34 PM, Franco Barrientos franco.barrien...@exalitica.com mailto:franco.barrien...@exalitica.com wrote: Hi all!, I have a problem with LogisticRegressionWithSGD, when I train a data set with one variable (wich is a amount of an item) and intercept, I get weights of (-0.4021,-207.1749) for both features, respectively. This don´t make sense to me because I run a logistic regression for the same data in SAS and I get these weights (-2.6604,0.000245). The rank of this variable is from 0 to 59102 with a mean of 1158. The problem is when I want to calculate the probabilities for each user from data set, this probability is near to zero or zero in much cases, because when spark calculates exp(-1*(-0.4021+(-207.1749)*amount)) this is a big number, in fact infinity for spark. How can I treat this variable? or why this happened? Thanks , Franco Barrientos Data Scientist Málaga #115, Of. 1003, Las Condes. Santiago, Chile. (+562)-29699649 tel:%28%2B562%29-29699649 (+569)-76347893 tel:%28%2B569%29-76347893 franco.barrien...@exalitica.com mailto:franco.barrien...@exalitica.com www.exalitica.com http://www.exalitica.com/ http://exalitica.com/web/img/frim.png
Re: When will Spark SQL support building DB index natively?
It is implemented in the same way as Hive and interoperates with the hive metastore. In 1.2 we are considering adding partitioning to the SparkSQL data source API as well.. However, for now, you should create a hive context and a partitioned table. Spark SQL will automatically select partitions when there are predicates in a query against the partitioning columns. On Wed, Dec 17, 2014 at 7:31 PM, Xuelin Cao xuelin...@yahoo.com wrote: Thanks, I didn't try the partitioned table support (sounds like a hive feature) Is there any guideline? Should I use hiveContext to create the table with partition firstly? On Thursday, December 18, 2014 2:28 AM, Michael Armbrust mich...@databricks.com wrote: - Dev list Have you looked at partitioned table support? That would only scan data where the predicate matches the partition. Depending on the cardinality of the customerId column that could be a good option for you. On Wed, Dec 17, 2014 at 2:25 AM, Xuelin Cao xuelin...@yahoo.com.invalid wrote: Hi, In Spark SQL help document, it says Some of these (such as indexes) are less important due to Spark SQL’s in-memory computational model. Others are slotted for future releases of Spark SQL. - Block level bitmap indexes and virtual columns (used to build indexes) For our use cases, DB index is quite important. I have about 300G data in our database, and we always use customer id as a predicate for DB look up. Without DB index, we will have to scan all 300G data, and it will take 1 minute for a simple DB look up, while MySQL only takes 10 seconds. We tried to create an independent table for each customer id, the result is pretty good, but the logic will be very complex. I'm wondering when will Spark SQL supports DB index, and before that, is there an alternative way to support DB index function? Thanks
does spark sql support columnar compression with encoding when caching tables
Hi All, Wondering if when caching a table backed by lzo compressed parquet data, if spark also compresses it (using lzo/gzip/snappy) along with column level encoding or just does the column level encoding when *spark.sql.inMemoryColumnarStorage.compressed *is set to true. This is because when I try to cache the data, I notice the memory being used is almost as much as the uncompressed size of the data. Thanks!
Re: Help with updateStateByKey
This produces the expected output, thank you! On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Ok, I have a better idea of what you’re trying to do now. I think the prob might be the map. The first time the function runs, currentValue will be None. Using map on None returns None. Instead, try: Some(currentValue.getOrElse(Seq.empty) ++ newValues) I think that should give you the expected result. From: Pierce Lamb richard.pierce.l...@gmail.com Date: Thursday, December 18, 2014 at 2:31 PM To: Silvio Fiorito silvio.fior...@granturing.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Help with updateStateByKey Hi Silvio, This is a great suggestion (I wanted to get rid of groupByKey), I have been trying to implement it this morning, but having some trouble. I would love to see your code for the function that goes inside updateStateByKey Here is my current code: def updateGroupByKey( newValues: Seq[(String, Long, Long)], currentValue: Option[Seq[(String, Long, Long)]] ): Option[Seq[(String, Long, Long)]] = { currentValue.map{ case (v) = v ++ newValues } } val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey) However, when I run it the grouped DStream doesn't get populated with anything. The issue is probably that currentValue is not actually an Option[Seq[triple]] but rather an Option[triple]. However if I change it to an Option[triple] then I have to also return an Option[triple] for updateStateByKey to compile, but I want that return value to be an Option[Seq[triple]] because ultimately i want the data to look like (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested, startTime, EndTime)...]) and have that Seq build over time Am I thinking about this wrong? Thank you On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Pierce, You shouldn’t have to use groupByKey because updateStateByKey will get a Seq of all the values for that key already. I used that for realtime sessionization as well. What I did was key my incoming events, then send them to udpateStateByKey. The updateStateByKey function then received a Seq of the events and the Option of the previous state for that key. The sessionization code then did its thing to check if the incoming events were part of the same session, based on a configured timeout. If a session already was active (from the previous state) and it hadn’t exceeded the timeout, it used that value. Otherwise it generated a new session id. Then the return value for the updateStateByKey function was a Tuple of session id and last timestamp. Then I joined the DStream with the session ids, which were both keyed off the same id and continued my processing. Your requirements may be different, but that’s what worked well for me. Another thing to consider is cleaning up old sessions by returning None in the updateStateByKey function. This will help with long running apps and minimize memory usage (and checkpoint size). I was using something similar to the method above on a live production stream with very little CPU and memory footprint, running for weeks at a time, processing up to 15M events per day with fluctuating traffic. Thanks, Silvio On 12/17/14, 10:07 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote: I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat ion-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to sessionize the page time. I believe the operator to make that happen is updateStateByKey. I'm having some trouble with this operator (I'm new to both Spark Scala); any help is appreciated. Thus far: val grouped =
Re: Help with updateStateByKey
Great, glad it worked out! Just keep an eye on memory usage as you roll it out. Like I said before, if you’ll be running this 24/7 consider cleaning up sessions by returning None after some sort of timeout. On 12/18/14, 8:25 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote: This produces the expected output, thank you! On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Ok, I have a better idea of what you’re trying to do now. I think the prob might be the map. The first time the function runs, currentValue will be None. Using map on None returns None. Instead, try: Some(currentValue.getOrElse(Seq.empty) ++ newValues) I think that should give you the expected result. From: Pierce Lamb richard.pierce.l...@gmail.com Date: Thursday, December 18, 2014 at 2:31 PM To: Silvio Fiorito silvio.fior...@granturing.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Help with updateStateByKey Hi Silvio, This is a great suggestion (I wanted to get rid of groupByKey), I have been trying to implement it this morning, but having some trouble. I would love to see your code for the function that goes inside updateStateByKey Here is my current code: def updateGroupByKey( newValues: Seq[(String, Long, Long)], currentValue: Option[Seq[(String, Long, Long)]] ): Option[Seq[(String, Long, Long)]] = { currentValue.map{ case (v) = v ++ newValues } } val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey) However, when I run it the grouped DStream doesn't get populated with anything. The issue is probably that currentValue is not actually an Option[Seq[triple]] but rather an Option[triple]. However if I change it to an Option[triple] then I have to also return an Option[triple] for updateStateByKey to compile, but I want that return value to be an Option[Seq[triple]] because ultimately i want the data to look like (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested, startTime, EndTime)...]) and have that Seq build over time Am I thinking about this wrong? Thank you On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Pierce, You shouldn’t have to use groupByKey because updateStateByKey will get a Seq of all the values for that key already. I used that for realtime sessionization as well. What I did was key my incoming events, then send them to udpateStateByKey. The updateStateByKey function then received a Seq of the events and the Option of the previous state for that key. The sessionization code then did its thing to check if the incoming events were part of the same session, based on a configured timeout. If a session already was active (from the previous state) and it hadn’t exceeded the timeout, it used that value. Otherwise it generated a new session id. Then the return value for the updateStateByKey function was a Tuple of session id and last timestamp. Then I joined the DStream with the session ids, which were both keyed off the same id and continued my processing. Your requirements may be different, but that’s what worked well for me. Another thing to consider is cleaning up old sessions by returning None in the updateStateByKey function. This will help with long running apps and minimize memory usage (and checkpoint size). I was using something similar to the method above on a live production stream with very little CPU and memory footprint, running for weeks at a time, processing up to 15M events per day with fluctuating traffic. Thanks, Silvio On 12/17/14, 10:07 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote: I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessioni zat ion-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to
UNION two RDDs
Hi Spark users, I wonder if val resultRDD = RDDA.union(RDDB) will always have records in RDDA before records in RDDB. Also, will resultRDD.coalesce(1) change this ordering? Best Regards, Jerry
Re: Standalone Spark program
Hey Akshat, What is the class that is not found, is it a Spark class or classes that you define in your own application? If the latter, then Akhil's solution should work (alternatively you can also pass the jar through the --jars command line option in spark-submit). If it's a Spark class, however, it's likely that the Spark assembly jar is not present on the worker nodes. When you build Spark on the cluster, you will need to rsync it to the same path on all the nodes in your cluster. For more information, see http://spark.apache.org/docs/latest/spark-standalone.html. -Andrew 2014-12-18 10:29 GMT-08:00 Akhil Das ak...@sigmoidanalytics.com: You can build a jar of your project and add it to the sparkContext (sc.addJar(/path/to/your/project.jar)) then it will get shipped to the worker and hence no classNotfoundException! Thanks Best Regards On Thu, Dec 18, 2014 at 10:06 PM, Akshat Aranya aara...@gmail.com wrote: Hi, I am building a Spark-based service which requires initialization of a SparkContext in a main(): def main(args: Array[String]) { val conf = new SparkConf(false) .setMaster(spark://foo.example.com:7077) .setAppName(foobar) val sc = new SparkContext(conf) val rdd = sc.parallelize(0 until 255) val res = rdd.mapPartitions(it = it).take(1) println(sres=$res) sc.stop() } This code works fine via REPL, but not as a standalone program; it causes a ClassNotFoundException. This has me confused about how code is shipped out to executors. When using via REPL, does the mapPartitions closure, it=it, get sent out when the REPL statement is executed? When this code is run as a standalone program (not via spark-submit), is the compiled code expected to be present at the the executor? Thanks, Akshat
Spark GraphX question.
Hi All, I am wondering what is the best way to remove transitive edges with maximum spanning tree. For example, Edges: 1 - 2 (30) 2 - 3 (30) 1 - 3 (25) where parenthesis is a weight for each edge. Then, I'd like to get the reduced edges graph after Transitive Reduction with considering the weight as a maximum spanning tree. Edges: 1 - 2 (30) 2 - 3 (30) Do you have a good idea for this? Thanks, Ted -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-question-tp20768.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Creating a smaller, derivative RDD from an RDD
We have a very large RDD and I need to create a new RDD whose values are derived from each record of the original RDD, and we only retain the few new records that meet a criteria. I want to avoid creating a second large RDD and then filtering it since I believe this could tax system resources unnecessarily (tell me if that assumption is wrong.) So for example, /and this is just an example/, say we have an RDD with 1 to 1,000,000 and we iterate through each value, and compute it's md5 hash, and we only keep the results that start with 'A'. What we've tried and seems to work but which seemed a bit ugly, and perhaps not efficient, was the following in pseudocode. * Is this the best way to do this?* Thanks bigRdd.flatMap( { i = val h = md5(i) if (h.substring(1,1) == 'A') { Array(h) } else { Array[String]() } }) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-smaller-derivative-RDD-from-an-RDD-tp20769.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Control default partition when load a RDD from HDFS
Hmmm, how to do that? You mean for each file create a RDD? Then I will have tons of RDD. And my calculation need to rely on other input, not just the file itself Can you show some pseudo code for that logic? Regards, Shuai From: Diego García Valverde [mailto:dgarci...@agbar.es] Sent: Wednesday, December 17, 2014 11:04 AM To: Shuai Zheng; 'Sun, Rui'; user@spark.apache.org Subject: RE: Control default partition when load a RDD from HDFS Why not is a good option to create a RDD per each 200Mb file and then apply the pre-calculations before merging them? I think the partitions per RDD must be transparent to the pre-calculations, and not to set them fixed to optimize the spark maps/reduces processes. De: Shuai Zheng [mailto:szheng.c...@gmail.com] Enviado el: miércoles, 17 de diciembre de 2014 16:01 Para: 'Sun, Rui'; user@spark.apache.org Asunto: RE: Control default partition when load a RDD from HDFS Nice, that is the answer I want. Thanks! From: Sun, Rui [mailto:rui@intel.com] Sent: Wednesday, December 17, 2014 1:30 AM To: Shuai Zheng; user@spark.apache.org Subject: RE: Control default partition when load a RDD from HDFS Hi, Shuai, How did you turn off the file split in Hadoop? I guess you might have implemented a customized FileInputFormat which overrides isSplitable() to return FALSE. If you do have such FileInputFormat, you can simply pass it as a constructor parameter to HadoopRDD or NewHadoopRDD in Spark. From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, December 17, 2014 4:16 AM To: user@spark.apache.org Subject: Control default partition when load a RDD from HDFS Hi All, My application load 1000 files, each file from 200M a few GB, and combine with other data to do calculation. Some pre-calculation must be done on each file level, then after that, the result need to combine to do further calculation. In Hadoop, it is simple because I can turn-off the file split for input format (to enforce each file will go to same mapper), then I will do the file level calculation in mapper and pass result to reducer. But in spark, how can I do it? Basically I want to make sure after I load these files into RDD, it is partitioned by file (not split file and also no merge there), so I can call mapPartitions. Is it any way I can control the default partition when I load the RDD? This might be the default behavior that spark do the partition (partitioned by file when first time load the RDD), but I cant find any document to support my guess, if not, can I enforce this kind of partition? Because the total file size is bigger, I dont want to re-partition in the code. Regards, Shuai _ Disclaimer: http://disclaimer.agbar.com
Re: hello
You mean to Spark User List, Its pretty easy. check the first email it has all instructions On 18 December 2014 at 21:56, csjtx1021 [via Apache Spark User List] ml-node+s1001560n20759...@n3.nabble.com wrote: i want to join you -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/hello-tp20759.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Regards, Harihar Nahak BigData Developer Wynyard Email:hna...@wynyardgroup.com | Extn: 8019 - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hello-tp20759p20770.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark GraphX question.
Hi Ted, I've no idea what is Transitive Reduction but the expected result you can achieve by graph.subgraph(graph.edges.filter()) syntax and which filter edges by its weight and give you new graph as per your condition. On 19 December 2014 at 11:11, Tae-Hyuk Ahn [via Apache Spark User List] ml-node+s1001560n20768...@n3.nabble.com wrote: Hi All, I am wondering what is the best way to remove transitive edges with maximum spanning tree. For example, Edges: 1 - 2 (30) 2 - 3 (30) 1 - 3 (25) where parenthesis is a weight for each edge. Then, I'd like to get the reduced edges graph after Transitive Reduction with considering the weight as a maximum spanning tree. Edges: 1 - 2 (30) 2 - 3 (30) Do you have a good idea for this? Thanks, Ted -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-question-tp20768.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Regards, Harihar Nahak BigData Developer Wynyard Email:hna...@wynyardgroup.com | Extn: 8019 - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-question-tp20768p20771.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to increase parallelism in Yarn
Hi, I am using Spark 1.1.1 on Yarn. When I try to run K-Means, I see from the Yarn dashboard that only 3 containers are being used. How do I increase the number of containers used? P.S: When I run K-Means on Mahout with the same settings, I see that there are 25-30 containers being used. Thanks, Suman.
Re: Creating a smaller, derivative RDD from an RDD
I don't think you can avoid examining each element of the RDD, if that's what you mean. Your approach is basically the best you can do in general. You're not making a second RDD here, and even if you did this in two steps, the second RDD is really more of a bookkeeping that a second huge data structure. You can simplify your example a bit, although I doubt it's noticeably faster: bigRdd.flatMap { i = val h = md5(i) if (h(0) == 'A') { Some(h) } else { None } } This is also fine, simpler still, and if it's slower, not by much: bigRdd.map(md5).filter(_(0) == 'A') On Thu, Dec 18, 2014 at 10:18 PM, bethesda swearinge...@mac.com wrote: We have a very large RDD and I need to create a new RDD whose values are derived from each record of the original RDD, and we only retain the few new records that meet a criteria. I want to avoid creating a second large RDD and then filtering it since I believe this could tax system resources unnecessarily (tell me if that assumption is wrong.) So for example, /and this is just an example/, say we have an RDD with 1 to 1,000,000 and we iterate through each value, and compute it's md5 hash, and we only keep the results that start with 'A'. What we've tried and seems to work but which seemed a bit ugly, and perhaps not efficient, was the following in pseudocode. * Is this the best way to do this?* Thanks bigRdd.flatMap( { i = val h = md5(i) if (h.substring(1,1) == 'A') { Array(h) } else { Array[String]() } }) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-smaller-derivative-RDD-from-an-RDD-tp20769.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to increase parallelism in Yarn
Hi Suman, I'll assume that you are using spark submit to run your application. You can pass the --num-executors flag to ask for more containers. If you want to allocate more memory for each executor, you may also pass in the --executor-memory flag (this accepts a string in the format 1g, 512m etc.). -Andrew 2014-12-18 14:37 GMT-08:00 Suman Somasundar suman.somasun...@oracle.com: Hi, I am using Spark 1.1.1 on Yarn. When I try to run K-Means, I see from the Yarn dashboard that only 3 containers are being used. How do I increase the number of containers used? P.S: When I run K-Means on Mahout with the same settings, I see that there are 25-30 containers being used. Thanks, Suman.
Re: MLLib /ALS : java.lang.OutOfMemoryError: Java heap space
Hi Jay, Please try increasing executor memory (if the available memory is more than 2GB) and reduce numBlocks in ALS. The current implementation stores all subproblems in memory and hence the memory requirement is significant when k is large. You can also try reducing k and see whether the problem is still there. I made a PR that improves the ALS implementation, which generates subproblems one by one. You can try that as well. https://github.com/apache/spark/pull/3720 Best, Xiangrui On Wed, Dec 17, 2014 at 6:57 PM, buring qyqb...@gmail.com wrote: I am not sure this can help you. I have 57 million rating,about 4million user and 4k items. I used 7-14 total-executor-cores,executal-memory 13g,cluster have 4 nodes,each have 4cores,max memory 16g. I found set as follows may help avoid this problem: conf.set(spark.shuffle.memoryFraction,0.65) //default is 0.2 conf.set(spark.storage.memoryFraction,0.3)//default is 0.6 I have to set rank value under 40, otherwise occure this problem. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-java-lang-OutOfMemoryError-Java-heap-space-tp20584p20755.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression?
Thanks dbtsai for the info. Are you using the case class for: Case(response, vec) = ? Also, what library do I need to import to use .toBreeze ? Thanks, tri -Original Message- From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] Sent: Friday, December 12, 2014 3:27 PM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression? You can do something like the following. val rddVector = input.map({ case (response, vec) = { val newVec = MLUtils.appendBias(vec) newVec.toBreeze(newVec.size - 1) = response newVec } } val scalerWithResponse = new StandardScaler(true, true).fit(rddVector) val trainingData = scalerWithResponse.transform(rddVector).map(x= { (x(x.size - 1), Vectors.dense(x.toArray.slice(0, x.size -1)) }) Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 12:23 PM, Bui, Tri tri@verizonwireless.com wrote: Thanks for the info. How do I use StandardScaler() to scale example data (10246.0,[14111.0,1.0]) ? Thx tri -Original Message- From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] Sent: Friday, December 12, 2014 1:26 PM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression? It seems that your response is not scaled which will cause issue in LBFGS. Typically, people train Linear Regression with zero-mean/unit-variable feature and response without training the intercept. Since the response is zero-mean, the intercept will be always zero. When you convert the coefficients to the oringal space from the scaled space, the intercept can be computed by w0 = y - \sum x_n w_n where x_n is the average of column n. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 10:49 AM, Bui, Tri tri@verizonwireless.com wrote: Thanks for the confirmation. Fyi..The code below works for similar dataset, but with the feature magnitude changed, LBFGS converged to the right weights. Example, time sequential Feature value 1, 2, 3, 4, 5, would generate the error while sequential feature 14111, 14112, 14113,14115 would converge to the right weight. Why? Below is code to implement standardscaler() for sample data (10246.0,[14111.0,1.0])): val scaler1 = new StandardScaler().fit(train.map(x = x.features)) val train1 = train.map(x = (x.label, scaler1.transform(x.features))) But I keeps on getting error: value features is not a member of (Double, org.apache.spark.mllib.linalg.Vector) Should my feature vector be .toInt instead of Double? Also, the error org.apache.spark.mllib.linalg.Vector should have an s to match import library org.apache.spark.mllib.linalg.Vectors Thanks Tri -Original Message- From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] Sent: Friday, December 12, 2014 12:16 PM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression? You need to do the StandardScaler to help the convergency yourself. LBFGS just takes whatever objective function you provide without doing any scaling. I will like to provide LinearRegressionWithLBFGS which does the scaling internally in the nearly feature. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 8:49 AM, Bui, Tri tri@verizonwireless.com.invalid wrote: Hi, Trying to use LBFGS as the optimizer, do I need to implement feature scaling via StandardScaler or does LBFGS do it by default? Following code generated error “ Failure again! Giving up and returning, Maybe the objective is just poorly behaved ?”. val data = sc.textFile(file:///data/Train/final2.train) val parsedata = data.map { line = val partsdata = line.split(',') LabeledPoint(partsdata(0).toDouble, Vectors.dense(partsdata(1).split(' ').map(_.toDouble))) } val train = parsedata.map(x = (x.label, MLUtils.appendBias(x.features))).cache() val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 50 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](2)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS(train, new LeastSquaresGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) Did I implement LBFGS for Linear Regression via “LeastSquareGradient()” correctly? Thanks Tri -
Sharing sqlContext between Akka router and routee actors ...
Hi, Akka router creates a sqlContext and creates a bunch of routees actors with sqlContext as parameter. The actors then execute query on that sqlContext. Would this pattern be a issue ? Any other way sparkContext etc. should be shared cleanly in Akka routers/routees ? Thanks,
Re: does spark sql support columnar compression with encoding when caching tables
There is only column level encoding (run length encoding, delta encoding, dictionary encoding) and no generic compression. On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi All, Wondering if when caching a table backed by lzo compressed parquet data, if spark also compresses it (using lzo/gzip/snappy) along with column level encoding or just does the column level encoding when *spark.sql.inMemoryColumnarStorage.compressed *is set to true. This is because when I try to cache the data, I notice the memory being used is almost as much as the uncompressed size of the data. Thanks!
Re: Sharing sqlContext between Akka router and routee actors ...
why do you need a router? I mean cannot you do with just one actor which has the SQLContext inside it? On Thu, Dec 18, 2014 at 9:45 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, Akka router creates a sqlContext and creates a bunch of routees actors with sqlContext as parameter. The actors then execute query on that sqlContext. Would this pattern be a issue ? Any other way sparkContext etc. should be shared cleanly in Akka routers/routees ? Thanks,
java.lang.ExceptionInInitializerError/Unable to load YARN support
All, I just built Spark-1.2 on my enterprise server (which has Hadoop 2.3 with YARN). Here're the steps I followed for the build: $ mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package $ export SPARK_HOME=/path/to/spark/folder $ export HADOOP_CONF_DIR=/etc/hadoop/conf However, when I try to work with this installation either locally or on YARN, I get the following error: Exception in thread main java.lang.ExceptionInInitializerError at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:232) at water.MyDriver$.main(MyDriver.scala:19) at water.MyDriver.main(MyDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.spark.SparkException: Unable to load YARN support at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:199) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala:194) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.scala) ... 15 more Caused by: java.lang.IllegalArgumentException: Invalid rule: L RULE:[2:$1@$0](.*@XXXCOMPANY.COM)s/(.*)@XXXCOMPANY.COM/$1/L DEFAULT at org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321) at org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386) at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.init(YarnSparkHadoopUtil.scala:45) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:374) at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:196) ... 17 more I noticed that when I unset HADOOP_CONF_DIR, I'm able to work in the local mode without any errors. I'm able to work with pre-installed Spark 1.0, locally and on yarn, without any issues. It looks like I may be missing a configuration step somewhere. Any thoughts on what may be causing this? NR -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ExceptionInInitializerError-Unable-to-load-YARN-support-tp20775.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
When will spark 1.2 released?
Hi, Dose any know when will spark 1.2 released? 1.2 has many great feature that we can't wait now ,-) Sincely Lin wukang 发自网易邮箱大师
Re: SchemaRDD.sample problem
Hi, Can you clean up the code lil bit better, it's hard to read what's going on. You can use pastebin or gist to put the code. On Wed, Dec 17, 2014 at 3:58 PM, Hao Ren inv...@gmail.com wrote: Hi, I am using SparkSQL on 1.2.1 branch. The problem comes froms the following 4-line code: *val t1: SchemaRDD = hiveContext hql select * from product where is_new = 0 val tb1: SchemaRDD = t1.sample(withReplacement = false, fraction = 0.05) tb1.registerTempTable(t1_tmp) (hiveContext sql select count(*) from t1_tmp where is_new = 1) collect foreach println* We know that *t1* contains only rows whose is_new field is zero. After sampling t1 by taking 5% rows, normally, the sampled table should always contains only rows where is_new = 0. However, line 4 gives a number about 5 by chance. That means there are some rows where is_new = 1 in the sampled table, which is not logically possible. I am not sure SchemaRDD.sample is doing his work well. Any idea ? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-problem-tp20741.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: When will spark 1.2 released?
It’s on Maven Central already http://search.maven.org/#browse%7C717101892 On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com vboylin1...@gmail.com wrote: Hi, Dose any know when will spark 1.2 released? 1.2 has many great feature that we can't wait now ,-) Sincely Lin wukang 发自网易邮箱大师 -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: When will spark 1.2 released?
Interesting, the maven artifacts were dated Dec 10th. However vote for RC2 closed recently: http://search-hadoop.com/m/JW1q5K8onk2/Patrick+spark+1.2.0subj=Re+VOTE+Release+Apache+Spark+1+2+0+RC2+ Cheers On Dec 18, 2014, at 10:02 PM, madhu phatak phatak@gmail.com wrote: It’s on Maven Central already http://search.maven.org/#browse%7C717101892 On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com vboylin1...@gmail.com wrote: Hi, Dose any know when will spark 1.2 released? 1.2 has many great feature that we can't wait now ,-) Sincely Lin wukang 发自网易邮箱大师 -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: When will spark 1.2 released?
Patrick is working on the release as we speak -- I expect it'll be out later tonight (US west coast) or tomorrow at the latest. On Fri, Dec 19, 2014 at 1:09 AM, Ted Yu yuzhih...@gmail.com wrote: Interesting, the maven artifacts were dated Dec 10th. However vote for RC2 closed recently: http://search-hadoop.com/m/JW1q5K8onk2/Patrick+spark+1.2.0subj=Re+VOTE+Release+Apache+Spark+1+2+0+RC2+ Cheers On Dec 18, 2014, at 10:02 PM, madhu phatak phatak@gmail.com wrote: It’s on Maven Central already http://search.maven.org/#browse%7C717101892 On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com vboylin1...@gmail.com wrote: Hi, Dose any know when will spark 1.2 released? 1.2 has many great feature that we can't wait now ,-) Sincely Lin wukang 发自网易邮箱大师 -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: When will spark 1.2 released?
Yup, as he posted before, An Apache infrastructure issue prevented me from pushing this last night. The issue was resolved today and I should be able to push the final release artifacts tonight. On Dec 18, 2014, at 10:14 PM, Andrew Ash and...@andrewash.com wrote: Patrick is working on the release as we speak -- I expect it'll be out later tonight (US west coast) or tomorrow at the latest. On Fri, Dec 19, 2014 at 1:09 AM, Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com wrote: Interesting, the maven artifacts were dated Dec 10th. However vote for RC2 closed recently: http://search-hadoop.com/m/JW1q5K8onk2/Patrick+spark+1.2.0subj=Re+VOTE+Release+Apache+Spark+1+2+0+RC2+ http://search-hadoop.com/m/JW1q5K8onk2/Patrick+spark+1.2.0subj=Re+VOTE+Release+Apache+Spark+1+2+0+RC2+ Cheers On Dec 18, 2014, at 10:02 PM, madhu phatak phatak@gmail.com mailto:phatak@gmail.com wrote: It’s on Maven Central already http://search.maven.org/#browse%7C717101892 http://search.maven.org/#browse%7C717101892 On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com mailto:vboylin1...@gmail.com vboylin1...@gmail.com mailto:vboylin1...@gmail.com wrote: Hi, Dose any know when will spark 1.2 released? 1.2 has many great feature that we can't wait now ,-) Sincely Lin wukang 发自网易邮箱大师 -- Regards, Madhukara Phatak http://www.madhukaraphatak.com http://www.madhukaraphatak.com/