Re: How to use spark-submit
Hi Stephen, I am using maven shade plugin for creating my uber jar. I have marked spark dependencies as provided. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Mon, May 12, 2014 at 1:04 AM, Stephen Boesch java...@gmail.com wrote: HI Sonal, Yes I am working towards that same idea. How did you go about creating the non-spark-jar dependencies ? The way I am doing it is a separate straw-man project that does not include spark but has the external third party jars included. Then running sbt compile:managedClasspath and reverse engineering the lib jars from it. That is obviously not ideal. The maven run will be useful for other projects built by maven: i will keep in my notes. AFA sbt run-example, it requires additional libraries to be added for my external dependencies. I tried several items including ADD_JARS, --driver-class-path and combinations of extraClassPath. I have deferred that ad-hoc approach to finding a systematic one. 2014-05-08 5:26 GMT-07:00 Sonal Goyal sonalgoy...@gmail.com: I am creating a jar with only my dependencies and run spark-submit through my project mvn build. I have configured the mvn exec goal to the location of the script. Here is how I have set it up for my app. The mainClass is my driver program, and I am able to send my custom args too. Hope this helps. plugin groupIdorg.codehaus.mojo/groupId artifactIdexec-maven-plugin/artifactId executions execution goals goalexec/goal /goals /execution /executions configuration executable/home/sgoyal/spark/bin/spark-submit/executable arguments argument${jars}/argument argument--class/argument argument${mainClass}/argument argument--arg/argument argument${spark.master}/argument argument--arg/argument argument${my app arg 1}/argument argument--arg/argument argument${my arg 2}/argument /arguments /configuration /plugin Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Wed, May 7, 2014 at 6:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Doesnt the run-example script work for you? Also, are you on the latest commit of branch-1.0 ? TD On Mon, May 5, 2014 at 7:51 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Yes, I'm struggling with a similar problem where my class are not found on the worker nodes. I'm using 1.0.0_SNAPSHOT. I would really appreciate if someone can provide some documentation on the usage of spark-submit. Thanks On May 5, 2014, at 10:24 PM, Stephen Boesch java...@gmail.com wrote: I have a spark streaming application that uses the external streaming modules (e.g. kafka, mqtt, ..) as well. It is not clear how to properly invoke the spark-submit script: what are the ---driver-class-path and/or -Dspark.executor.extraClassPath parameters required? For reference, the following error is proving difficult to resolve: java.lang.ClassNotFoundException: org.apache.spark.streaming.examples.StreamingExamples
Re: File present but file not found exception
I found that if a file is present in all the nodes in the given path in localFS, then reading is possible. But is there a way to read if the file is present only in certain nodes ?? [There should be a way !!] *NEED: Wanted to do some filter ops in HDFS file, create a local file of the result, create an RDD out of it operate * Is there any way out ?? Thanks in advance ! On Fri, May 9, 2014 at 12:18 AM, Sai Prasanna ansaiprasa...@gmail.comwrote: Hi Everyone, I think all are pretty busy, the response time in this group has slightly increased. But anyways, this is a pretty silly problem, but could not get over. I have a file in my localFS, but when i try to create an RDD out of it, tasks fails with file not found exception is thrown at the log files. *var file = sc.textFile(file:///home/sparkcluster/spark/input.txt);* *file.top(1);* input.txt exists in the above folder but still Spark coudnt find it. Some parameters need to be set ?? Any help is really appreciated. Thanks !!
Re: java.lang.NoSuchMethodError on Java API
Sure, I uploaded the code on pastebin: http://pastebin.com/90Hynrjh On Mon, May 12, 2014 at 12:27 AM, Madhu ma...@madhu.com wrote: No, you don't need to do anything special to get it to run in Eclipse. Just add the assembly jar to the build path, create a main method, add your code, and click the green run button. Can you post your code here? I can try it in my environment. - Madhu https://www.linkedin.com/in/msiddalingaiah -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-NoSuchMethodError-on-Java-API-tp5545p5567.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Alessandro De Carli Sonnmattstr. 121 CH-5242 Birr Email: decarli@gmail.com Twitter: @a_d_c_ Tel: +41 76 305 75 00 Web: http://www.papers.ch
Re: How to use spark-submit
@Sonal - makes sense. Is the maven shade plugin runnable within sbt ? If so would you care to share those build.sbt (or .scala) lines? If not, are you aware of a similar plugin for sbt? 2014-05-11 23:53 GMT-07:00 Sonal Goyal sonalgoy...@gmail.com: Hi Stephen, I am using maven shade plugin for creating my uber jar. I have marked spark dependencies as provided. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Mon, May 12, 2014 at 1:04 AM, Stephen Boesch java...@gmail.com wrote: HI Sonal, Yes I am working towards that same idea. How did you go about creating the non-spark-jar dependencies ? The way I am doing it is a separate straw-man project that does not include spark but has the external third party jars included. Then running sbt compile:managedClasspath and reverse engineering the lib jars from it. That is obviously not ideal. The maven run will be useful for other projects built by maven: i will keep in my notes. AFA sbt run-example, it requires additional libraries to be added for my external dependencies. I tried several items including ADD_JARS, --driver-class-path and combinations of extraClassPath. I have deferred that ad-hoc approach to finding a systematic one. 2014-05-08 5:26 GMT-07:00 Sonal Goyal sonalgoy...@gmail.com: I am creating a jar with only my dependencies and run spark-submit through my project mvn build. I have configured the mvn exec goal to the location of the script. Here is how I have set it up for my app. The mainClass is my driver program, and I am able to send my custom args too. Hope this helps. plugin groupIdorg.codehaus.mojo/groupId artifactIdexec-maven-plugin/artifactId executions execution goals goalexec/goal /goals /execution /executions configuration executable/home/sgoyal/spark/bin/spark-submit/executable arguments argument${jars}/argument argument--class/argument argument${mainClass}/argument argument--arg/argument argument${spark.master}/argument argument--arg/argument argument${my app arg 1}/argument argument--arg/argument argument${my arg 2}/argument /arguments /configuration /plugin Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Wed, May 7, 2014 at 6:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Doesnt the run-example script work for you? Also, are you on the latest commit of branch-1.0 ? TD On Mon, May 5, 2014 at 7:51 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Yes, I'm struggling with a similar problem where my class are not found on the worker nodes. I'm using 1.0.0_SNAPSHOT. I would really appreciate if someone can provide some documentation on the usage of spark-submit. Thanks On May 5, 2014, at 10:24 PM, Stephen Boesch java...@gmail.com wrote: I have a spark streaming application that uses the external streaming modules (e.g. kafka, mqtt, ..) as well. It is not clear how to properly invoke the spark-submit script: what are the ---driver-class-path and/or -Dspark.executor.extraClassPath parameters required? For reference, the following error is proving difficult to resolve: java.lang.ClassNotFoundException: org.apache.spark.streaming.examples.StreamingExamples
Re: How to read a multipart s3 file?
On Wed, May 7, 2014 at 4:00 AM, Han JU ju.han.fe...@gmail.com wrote: But in my experience, when reading directly from s3n, spark create only 1 input partition per file, regardless of the file size. This may lead to some performance problem if you have big files. You can (and perhaps should) always repartition() the RDD explicitly to increase your level of parallelism to match the number of cores in your cluster. It’s pretty quick, and will speed up all subsequent operations.
Re: Spark to utilize HDFS's mmap caching
Yes, Spark goes through the standard HDFS client and will automatically benefit from this. Matei On May 8, 2014, at 4:43 AM, Chanwit Kaewkasi chan...@gmail.com wrote: Hi all, Can Spark (0.9.x) utilize the caching feature in HDFS 2.3 via sc.textFile() and other HDFS-related APIs? http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit
Re: build shark(hadoop CDH5) on hadoop2.0.0 CDH4
There was never a Hadoop 2.0.0. There was a Hadoop 2.0.0-alpha as far as Maven artifacts are concerned. The latest in that series is 2.0.6-alpha. On Mon, May 12, 2014 at 4:29 AM, Sophia sln-1...@163.com wrote: I have built shark in sbt way,but the sbt exception turn out: [error] sbt.resolveException:unresolved dependency: org.apache.hadoop#hadoop-client;2.0.0: not found. How can I do to build it well? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/build-shark-hadoop-CDH5-on-hadoop2-0-0-CDH4-tp5574.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Client cannot authenticate via:[TOKEN]
I'm trying to run spark-shell on Hadoop yarn. Specifically, the environment is as follows: - Client - OS: Windows 7 - Spark version: 1.0.0-SNAPSHOT (git cloned 2014.5.8) - Server - Platform: hortonworks sandbox 2.1 I modified the spark code to apply https://issues.apache.org/jira/browse/YARN-1824 so that the cross-platform issues are removed. (that is, $() to $$(), File.pathSeparator to ApplicationConstants.CLASS_PATH_SEPARATOR) Now when I run spark-shell on client(Windows 7), server log is produced as follows: ('owner' is the user name of the Windows 7 machine.) Log Type: stderr Log Length: 1356 log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/05/12 01:13:54 INFO YarnSparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/05/12 01:13:54 INFO SecurityManager: Changing view acls to: yarn,owner 14/05/12 01:13:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, owner) 14/05/12 01:13:55 INFO Slf4jLogger: Slf4jLogger started 14/05/12 01:13:56 INFO Remoting: Starting remoting 14/05/12 01:13:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkyar...@sandbox.hortonworks.com:47074] 14/05/12 01:13:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkyar...@sandbox.hortonworks.com:47074] 14/05/12 01:13:56 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8030 14/05/12 01:13:56 INFO ExecutorLauncher: ApplicationAttemptId: appattempt_1399856448891_0018_01 14/05/12 01:13:56 INFO ExecutorLauncher: Registering the ApplicationMaster 14/05/12 01:13:56 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN] How can I handle this error? Thanks in advance.
Spark on Yarn - A small issue !
Hi All, I wanted to launch Spark on Yarn, interactive - yarn client mode. With default settings of yarn-site.xml and spark-env.sh, i followed the given link http://spark.apache.org/docs/0.8.1/running-on-yarn.html I get the pi value correct when i run without launching the shell. When i launch the shell, with following command, SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.3.0.jar \ SPARK_YARN_APP_JAR=examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar \ MASTER=yarn-client ./spark-shell And try to create RDDs and do some action on it, i get nothing. After sometime tasks fails. LogFile of spark: 519095 14/05/12 13:30:40 INFO YarnClientClusterScheduler: YarnClientClusterScheduler.postStartHook done 519096 14/05/12 13:30:40 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:38355 with 324.4 MB RAM 519097 14/05/12 13:31:38 INFO MemoryStore: ensureFreeSpace(202584) called with curMem=0, maxMem=340147568 519098 14/05/12 13:31:38 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 197.8 KB, free 324.2 MB) 519099 14/05/12 13:31:49 INFO FileInputFormat: Total input paths to process : 1 519100 14/05/12 13:31:49 INFO NetworkTopology: Adding a new node: /default-rack/192.168.1.100:50010 519101 14/05/12 13:31:49 INFO SparkContext: Starting job: top at console:15 519102 14/05/12 13:31:49 INFO DAGScheduler: Got job 0 (top at console:15) with 4 output partitions (allowLocal=false) 519103 14/05/12 13:31:49 INFO DAGScheduler: Final stage: Stage 0 (top at console:15) 519104 14/05/12 13:31:49 INFO DAGScheduler: Parents of final stage: List() 519105 14/05/12 13:31:49 INFO DAGScheduler: Missing parents: List() 519106 14/05/12 13:31:49 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at top at console:15), which has no missing par ents 519107 14/05/12 13:31:49 INFO DAGScheduler: Submitting 4 missing tasks from Stage 0 (MapPartitionsRDD[2] at top at console:15) 519108 14/05/12 13:31:49 INFO YarnClientClusterScheduler: Adding task set 0.0 with 4 tasks 519109 14/05/12 13:31:49 INFO *RackResolver: Resolved s1 to /default-rack* *519110 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:3 as TID 0 on executor 1: s1 (PROCESS_LOCAL)* *519111 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:3 as 1811 bytes in 4 ms* *519112 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:0 as TID 1 on executor 1: s1 (NODE_LOCAL)* *519113 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:0 as 1811 bytes in 1 ms* 519114 14/05/12 13:32:18* INFO YarnClientSchedulerBackend: Executor 1 disconnected, so removing it* *519115 14/05/12 13:32:18 ERROR YarnClientClusterScheduler: Lost executor 1 on s1: remote Akka client shutdown* *519116 14/05/12 13:32:18 INFO ClusterTaskSetManager: Re-queueing tasks for 1 from TaskSet 0.0* *519117 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 1 (task 0.0:0)* *519118 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 0 (task 0.0:3)* *519119 14/05/12 13:32:18 INFO DAGScheduler: Executor lost: 1 (epoch 0)* *519120 14/05/12 13:32:18 INFO BlockManagerMasterActor: Trying to remove executor 1 from BlockManagerMaster.* *519121 14/05/12 13:32:18 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor* Do i need to set any other env-variable specifically for SPARK on YARN. What could be the isuue ?? Can anyone please help me in this regard. Thanks in Advance !!
Re: Is there any problem on the spark mailing list?
Note the mails are coming out of order in some cases. I am getting current messages but a sprinkling of old replies too. On May 12, 2014 12:16 PM, ankurdave ankurd...@gmail.com wrote: I haven't been getting mail either. This was the last message I received: http://apache-spark-user-list.1001560.n3.nabble.com/master-attempted-to-re-register-the-worker-and-then-took-all-workers-as-unregistered-tp553p5491.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-problem-on-the-spark-mailing-list-tp5509p5515.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark-env.sh do not take effect.
Hi, I set a small cluster with 3 machines, every machine is 64GB RAM, 11 Core. and I used the spark0.9. I have set spark-env.sh as following: *SPARK_MASTER_IP=192.168.35.2* * SPARK_MASTER_PORT=7077* * SPARK_MASTER_WEBUI_PORT=12306* * SPARK_WORKER_CORES=3* * SPARK_WORKER_MEMORY=20g* * SPARK_JAVA_OPTS+=-Dspark.executor.memory=5g* but I see the log in the master as following, *Spark Command: java -cp :/usr/local/spark-0.9.0/conf:/usr/local/spark-0.9.0/assembly/target/scala-2.1 0/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip 192.168.35.2 --port 7077 --webui-port 12306* * * * log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger).* * log4j:WARN Please initialize the log4j system properly.* * log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.* * 14/05/07 08:30:31 INFO Master: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties* * 14/05/07 08:30:31 INFO Master: Starting Spark master at spark://192.168.35.2:7077 http://192.168.35.2:7077* * 14/05/07 08:30:31 INFO MasterWebUI: Started Master web UI at http://pug-master:12306 http://pug-master:12306* * 14/05/07 08:30:31 INFO Master: I have been elected leader! New state: ALIVE* * 14/05/07 08:30:34 INFO Master: Registering worker 192.168.35.2:52972 http://192.168.35.2:52972 with 11 cores, 61.9 GB RAM* * 14/05/07 08:30:34 INFO Master: Registering worker 192.168.35.2:43225 http://192.168.35.2:43225 with 11 cores, 61.9 GB RAM* and the log in my worker as following: *Spark Command: java -cp :/usr/local/spark-0.9.0/conf:/usr/local/spark-0.9.0/assembly/target/scala-2.1 0/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://192.168.35.2:7077 http://192.168.35.2:7077* * * * log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger).* * log4j:WARN Please initialize the log4j system properly.* * log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.* * 14/05/07 08:30:34 INFO Worker: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties* * 14/05/07 08:30:34 INFO Worker: Starting Spark worker pug1:43225 with 11 cores, 61.9 GB RAM* * 14/05/07 08:30:34 INFO Worker: Spark home: /usr/local/spark-0.9.0* * 14/05/07 08:30:34 INFO WorkerWebUI: Started Worker web UI at http://pug1:8081 http://pug1:8081* * 14/05/07 08:30:34 INFO Worker: Connecting to master spark://192.168.35.2:7077...* * 14/05/07 08:30:34 INFO Worker: Successfully registered with master spark://192.168.35.2:7077 http://192.168.35.2:7077* I have checked that I do not spell configuration by mistaken, and use the rsync sync the spark-env.sh file from the master to the workers. but it seem that the spark-env.sh do not take effect. I do not know what I have missed.
missing method in my slf4j after excluding Spark ZK log4j
Hey guys, I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j dependency and was told that it was gone. However I still find it part of zookeeper imports. This is fine since I exclude it myself in the sbt file, but another issue arises. I wonder if anyone else has run into this. Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its log4j v 1.2.17 because I get missing method error: java.lang.NoSuchMethodError: org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at scala.Option.map(Option.scala:145) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126) at org.apache.spark.SparkContext.init(SparkContext.scala:139) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76) ... Is there a way to find out what versions of slf4j I need to make it work with log4j 1.2.17? -Adrian
Re: Variables outside of mapPartitions scope
Right now I am not using any class variables (references to this). All my variables are created within the scope of the method I am running. I did more debugging and found this strange behavior. variables here for loop mapPartitions call use variables here end mapPartitions endfor This will result in a serializable bug, but this won't variables here for loop create new references to variables here mapPartitions call use new reference variables here end mapPartitions endfor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Variables-outside-of-mapPartitions-scope-tp5517p5528.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: logging in pyspark
Ah, yes, that is correct. You need a serializable object one way or the other. An alternate suggestion would be to use a combination of RDD.sample()http://spark.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#sampleand collect() to take a look at some small amount of data and just log it from the driver. That's pretty awkward as well, but will spare you having to make some kind of serializable logger function. On Wed, May 7, 2014 at 9:32 AM, Diana Carroll dcarr...@cloudera.com wrote: foreach vs. map isn't the issue. Both require serializing the called function, so the pickle error would still apply, yes? And at the moment, I'm just testing. Definitely wouldn't want to log something for each element, but may want to detect something and log for SOME elements. So my question is: how are other people doing logging from distributed tasks, given the serialization issues? The same issue actually exists in Scala, too. I could work around it by creating a small serializable object that provides a logger, but it seems kind of kludgy to me, so I'm wondering if other people are logging from tasks, and if so, how? Diana On Tue, May 6, 2014 at 6:24 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I think you're looking for RDD.foreach()http://spark.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#foreach . According to the programming guidehttp://spark.apache.org/docs/latest/scala-programming-guide.html : Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems. Do you really want to log something for each element of your RDD? Nick On Tue, May 6, 2014 at 3:31 PM, Diana Carroll dcarr...@cloudera.comwrote: What should I do if I want to log something as part of a task? This is what I tried. To set up a logger, I followed the advice here: http://py4j.sourceforge.net/faq.html#how-to-turn-logging-on-off logger = logging.getLogger(py4j) logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler()) This works fine when I call it from my driver (ie pyspark): logger.info(this works fine) But I want to try logging within a distributed task so I did this: def logTestMap(a): logger.info(test) return a myrdd.map(logTestMap).count() and got: PicklingError: Can't pickle 'lock' object So it's trying to serialize my function and can't because of a lock object used in logger, presumably for thread-safeness. But then...how would I do it? Or is this just a really bad idea? Thanks Diana
Forcing spark to send exactly one element to each worker node
Hi all, I'm currently trying to use pipe to run C++ code on each worker node, and I have an RDD of essentially command line arguments that i'm passing to each node. I want to send exactly one element to each node, but when I run my code, Spark ends up sending multiple elements to a node: is there any way to force Spark to send only one? I've tried coalescing and repartitioning the RDD to be equal to the number of elements in the RDD, but that hasn't worked. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-spark-to-send-exactly-one-element-to-each-worker-node-tp5605.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Forcing spark to send exactly one element to each worker node
Fixed the problem as soon as I sent this out, sigh. Apparently you can do this by changing the number of slices to cut the dataset into: I thought that was identical to the amount of partitions, but apparently not. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-spark-to-send-exactly-one-element-to-each-worker-node-tp5605p5607.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Turn BLAS on MacOSX
Those are warning messages instead of errors. You need to add netlib-java:all to use native BLAS/LAPACK. But it won't work if you include netlib-java:all in an assembly jar. It has to be a separate jar when you submit your job. For SGD, we only use level-1 BLAS, so I don't think native code is called. -Xiangrui On Sun, May 11, 2014 at 9:32 AM, DB Tsai dbt...@stanford.edu wrote: Hi Debasish, In https://github.com/apache/spark/blob/master/docs/mllib-guide.md Dependencies section, the document talks about the native blas dependencies issue. For netlib which breeze uses internally, if the native library isn't found, the jblas implementation will be used. Here is more detail about how to install native library in different platform. https://github.com/fommil/netlib-java/blob/master/README.md#machine-optimised-system-libraries Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, May 7, 2014 at 10:52 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, How do I load native BLAS libraries on Mac ? I am getting the following errors while running LR and SVM with SGD: 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS centos it was fine...but on mac I am getting these warnings.. Also when it fails to run native blas does it use java code for BLAS operations ? May be after addition of breeze, we should add these details on a page as well so that users are aware of it before they report any performance results.. Thanks. Deb
Re: Is their a way to Create SparkContext object?
You can just pass it around as a parameter. On May 12, 2014, at 12:37 PM, yh18190 yh18...@gmail.com wrote: Hi, Could anyone suggest an idea how can we create sparkContext object in other classes or fucntions where we need to convert a scala collection to RDD using sc object.like sc.makeRDD(list).instead of using Main class sparkcontext object? is their a way to pass sc object as a parameter to function in other classes? Please let me know -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-their-a-way-to-Create-SparkContext-object-tp5612.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Is their a way to Create SparkContext object?
Hi, Could anyone suggest an idea how can we create sparkContext object in other classes or fucntions where we need to convert a scala collection to RDD using sc object.like sc.makeRDD(list).instead of using Main class sparkcontext object? is their a way to pass sc object as a parameter to function in other classes? Please let me know -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-their-a-way-to-Create-SparkContext-object-tp5612.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
java.lang.StackOverflowError when calling count()
Dear Sparkers: I am using Python spark of version 0.9.0 to implement some iterative algorithm. I got some errors shown at the end of this email. It seems that it's due to the Java Stack Overflow error. The same error has been duplicated on a mac desktop and a linux workstation, both running the same version of Spark. The same line of code works correctly after quite some iterations. At the line of error, rdd__new.count() could be 0. (In some previous rounds, this was also 0 without any problem). Any thoughts on this? Thank you very much, - Guanhua CODE:print round, round, rdd__new.count() File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py, line 542, in count 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to java.lang.StackOverflowError [duplicate 1] return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; aborting job File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py, line 533, in sum 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED from TID 1774 because its task set is gone return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py, line 499, in reduce vals = self.mapPartitions(func).collect() File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py, line 463, in collect bytesInJava = self._jrdd.collect().iterator() File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0. 8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0. 8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect. : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times (most recent failure: Exception failure: java.lang.StackOverflowError) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$ DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$ DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5 9) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedu ler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc heduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc heduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$re ceive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDis patcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1 339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java :107) == The stack overflow error is shown as follows: == 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774 java.lang.StackOverflowError at java.util.zip.Inflater.inflate(Inflater.java:259) at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116) at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2 323) at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.jav a:2818) at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at
Re: Average of each RDD in Stream
You mean you normally get an RDD, right? A DStream is a sequence of RDDs. It kind of depends on what you are trying to accomplish here? sum/count for each RDD in the stream? On Wed, May 7, 2014 at 6:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, I use the following code for calculating average. The problem is that the reduce operation return a DStream here and not a tuple as it normally does without Streaming. So how can we get the sum and the count from the DStream. Can we cast it to tuple? val numbers = ssc.textFileStream(args(1)) val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1 + b._1, a._2 + b._2) } sumandcount.print() Regards, Laeeq
Average of each RDD in Stream
Hi, I use the following code for calculating average. The problem is that the reduce operation return a DStream here and not a tuple as it normally does without Streaming. So how can we get the sum and the count from the DStream. Can we cast it to tuple? val numbers = ssc.textFileStream(args(1)) val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1 + b._1, a._2 + b._2) } sumandcount.print() Regards, Laeeq
Re: Forcing spark to send exactly one element to each worker node
A few more data points: my current theory is now that spark's piping mechanism is considerably slower than just running the C++ app directly on the node. I ran the C++ application directly on a node in the cluster, and timed the execution of various parts of the program, and got ~10 seconds to run the entire thing, with it taking ~6 seconds to run a particular function, 2 seconds to run another function. I then use Spark's piping mechanism, and got ~180 seconds to run the entire thing, 120 seconds to run the 6 second function, and 24 seconds to run the 2 second function. I was under the impression that pipe() would just run the C++ application on the remote node: is the application supposed to run slower if you use pipe() to execute it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-spark-to-send-exactly-one-element-to-each-worker-node-tp5605p5620.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.lang.NoSuchMethodError on Java API
I was able to compile your code in Eclipse. I ran it using the data in your comments, but I also see the NoSuchMethodError you mentioned. It seems to run fine until the call to calculateZVector(...) It appears that org.apache.commons.math3.util.Pair is not Serializable, so that's one potential problem. I created a Serializable version of Pair, but that wasn't enough. Commenting this code: zVectorRaw.reduce(new Function2Tuple2lt;Integer, Double, Tuple2Integer, Double, Tuple2Integer, Double() { @Override public Tuple2Integer, Double call(Tuple2Integer, Double integerDoubleTuple2, Tuple2Integer, Double integerDoubleTuple22) throws Exception { return null; } }); Avoids the NoSuchMethodError, so that might be part of your problem. Then I get a NotSerializableException, so my guess is there's a reference to something else that's not serializable in that's referenced in that method. There's a lot of stuff going on in that method, so it's not easy for me to follow. I would break it down to more manageable pieces and build it up one step at a time. Sorry I couldn't find the problem. - Madhu https://www.linkedin.com/in/msiddalingaiah -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-NoSuchMethodError-on-Java-API-tp5545p5623.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: missing method in my slf4j after excluding Spark ZK log4j
It sounds like you are doing everything right. NoSuchMethodError suggests it's finding log4j, just not the right version. That method is definitely in 1.2; it might have been removed in 2.x? (http://logging.apache.org/log4j/2.x/manual/migration.html) So I wonder if something is sneaking in log4j 2.x in your app? that's a first guess. I'd say consult mvn dependency:tree, but you're on sbt and I don't know the equivalent. On Mon, May 12, 2014 at 3:51 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hey guys, I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j dependency and was told that it was gone. However I still find it part of zookeeper imports. This is fine since I exclude it myself in the sbt file, but another issue arises. I wonder if anyone else has run into this. Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its log4j v 1.2.17 because I get missing method error: java.lang.NoSuchMethodError: org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at scala.Option.map(Option.scala:145) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126) at org.apache.spark.SparkContext.init(SparkContext.scala:139) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76) ... Is there a way to find out what versions of slf4j I need to make it work with log4j 1.2.17? -Adrian
Unexpected results when caching data
I have been experimenting with a data set with and without persisting the RDD and have come across some unexpected results. The files we are reading are Avro files so we are using the following to define the RDD, what we end up with is a RDD[CleansedLogFormat]: val f = new NewHadoopRDD(sc, classOf[AvroKeyInputFormat[CleansedLogFormat]], classOf[AvroKey[CleansedLogFormat]], classOf[NullWritable], job.getConfiguration).map(_._1.datum()) f.count() = 110268763 f.persist(StorageLevel.MEMORY_AND_DISK).count() = 110268763 So far so good. Both the persisted and non-persisted RDDs return the same results for the count. Where things get weird is when I try and do some reduce by key or other grouping operations. Something like: f.map(record = (record.getProviderId.toString, record)).join(bandwidthKv).map { pair = val hour = new DateTime(pair._2._1.getTimeStamp).toString(MMddHH) (hour, Set(pair._2._1.getGuid)) }.reduceByKey(_ ++ _).collect().foreach { a = println(a._1 + : + a._2.size)} We then get different results from the non-persisted vs. the persisted version. Non-persisted: 2014050917: 7 2014050918: 42 Persisted: 2014050917: 7 2014050918: 12 Any idea what could account for the differences? BTW I am using Spark 0.9.1. Thanks, Paul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unexpected-results-when-caching-data-tp5619.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark to utilize HDFS's mmap caching
Is that true? I believe that API Chanwit is talking about requires explicitly asking for files to be cached in HDFS. Spark automatically benefits from the kernel's page cache (i.e. if some block is in the kernel's page cache, it will be read more quickly). But the explicit HDFS cache is a different thing; Spark applications that want to use it would have to explicitly call the respective HDFS APIs. On Sun, May 11, 2014 at 11:04 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Yes, Spark goes through the standard HDFS client and will automatically benefit from this. Matei On May 8, 2014, at 4:43 AM, Chanwit Kaewkasi chan...@gmail.com wrote: Hi all, Can Spark (0.9.x) utilize the caching feature in HDFS 2.3 via sc.textFile() and other HDFS-related APIs? http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit -- Marcelo
Re: missing method in my slf4j after excluding Spark ZK log4j
This gives dependency tree in SBT (spark uses this). https://github.com/jrudolph/sbt-dependency-graph TD On Mon, May 12, 2014 at 4:55 PM, Sean Owen so...@cloudera.com wrote: It sounds like you are doing everything right. NoSuchMethodError suggests it's finding log4j, just not the right version. That method is definitely in 1.2; it might have been removed in 2.x? (http://logging.apache.org/log4j/2.x/manual/migration.html) So I wonder if something is sneaking in log4j 2.x in your app? that's a first guess. I'd say consult mvn dependency:tree, but you're on sbt and I don't know the equivalent. On Mon, May 12, 2014 at 3:51 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hey guys, I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j dependency and was told that it was gone. However I still find it part of zookeeper imports. This is fine since I exclude it myself in the sbt file, but another issue arises. I wonder if anyone else has run into this. Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its log4j v 1.2.17 because I get missing method error: java.lang.NoSuchMethodError: org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at scala.Option.map(Option.scala:145) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126) at org.apache.spark.SparkContext.init(SparkContext.scala:139) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76) ... Is there a way to find out what versions of slf4j I need to make it work with log4j 1.2.17? -Adrian
Re: streaming on hdfs can detected all new file, but the sum of all the rdd.count() not equals which had detected
A very crucial thing to remember when using file stream is that the files must be written to the monitored directory atomically. That is when the file system show the file in its listing, the file should not be appended / updated after that. That often causes this kind of issues, as spark streaming may the file (soon after it is visible in the listing) and may try to process it even before all of the data has been written. So the best way to feed data into spark streaming is to write the file to a temp dir, and them move / rename them into the monitored directory. That makes it atomic. This is mentioned in the API docs of fileStreamhttp://spark.apache.org/docs/0.9.1/api/streaming/index.html#org.apache.spark.streaming.StreamingContext . TD On Sun, May 11, 2014 at 7:30 PM, zqf12345 zqf12...@gmail.comwrote: when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200 files , but the sum of rdd.count() is less than 200, always between 130 and 170, I don't know why...Is this a Bug? PS: When I put 200 files in hdfs before streaming run , It get the correct count and right result. Here is the code: def main(args: Array[String]) { val conf = new SparkConf().setMaster(SparkURL) .setAppName(QimageStreaming-broadcast) .setSparkHome(System.getenv(SPARK_HOME)) .setJars(SparkContext.jarOfClass(this.getClass())) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, qing.hdu.Image.MyRegistrator) conf.set(spark.kryoserializer.buffer.mb, 10); val ssc = new StreamingContext(conf, Seconds(2)) val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]] val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]] val input_path = HdfsURL + /Qimage/input val output_path = HdfsURL + /Qimage/output/ val bg_path = HdfsURL + /Qimage/bg/ val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage, QimageInputFormat[Text, Qimage]](bg_path) val bbg = bg.map(data = (data._1.toString(), data._2)) val broadcastbg = ssc.sparkContext.broadcast(bbg) val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text, Qimage]](input_path) val qingbg = broadcastbg.value.collectAsMap val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) = { val rddnum = rdd.count System.out.println(\n\n+ rddnum is + rddnum + \n\n) if (rddnum 0) { System.out.println(here is foreachFunc) val a = rdd.keys val b = a.first val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage) rdd.map(data = (data._1, (new QimageProc(data._1, data._2)).koutu(cbg))) .saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage], outputFormatClass) } } file.foreachRDD(foreachFunc) ssc.start() ssc.awaitTermination() } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.lang.ClassNotFoundException
Hi Joe, Your messages are going into spam folder for me. Thx, Archit_Thakur. On Fri, May 2, 2014 at 9:22 AM, Joe L selme...@yahoo.com wrote: Hi, You should include the jar file of your project. for example: conf.set(yourjarfilepath.jar) Joe On Friday, May 2, 2014 7:39 AM, proofmoore [via Apache Spark User List] [hidden email] wrote: HelIo. I followed A Standalone App in Java part of the tutorial https://spark.apache.org/docs/0.8.1/quick-start.html Spark standalone cluster looks it's running without a problem : http://i.stack.imgur.com/7bFv8.png I have built a fat jar for running this JavaApp on the cluster. Before maven package: find . ./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java content of SimpleApp.java is : import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; public class SimpleApp { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster(spark://10.35.23.13:7077) .setAppName(My app) .set(spark.executor.memory, 1g); JavaSparkContext sc = new JavaSparkContext (conf); String logFile = /home/ubuntu/spark-0.9.1/test_data; JavaRDDString logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(a); } }).count(); System.out.println(Lines with a: + numAs); } } This program only works when master is set as setMaster(local). Otherwise I get this error : http://i.stack.imgur.com/doRSn.png Thanks, Ibrahim -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-tp5191.html To start a new topic under Apache Spark User List, email [hidden email] To unsubscribe from Apache Spark User List, click here. NAML -- View this message in context: Re: java.lang.ClassNotFoundException Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to read a multipart s3 file?
One way to ensure Spark writes more partitions is by using RDD#repartition() to make each partition smaller. One Spark partition always corresponds to one file in the underlying store, and it's usually a good idea to have each partition size range somewhere between 64 MB to 256 MB. Too few partitions leads to other problems, such as too little concurrency -- Spark can only run as many tasks as there are partitions, so if you don't have enough partitions, your cluster will be underutilized. On Tue, May 6, 2014 at 7:07 PM, kamatsuoka ken...@gmail.com wrote: Yes, I'm using s3:// for both. I was using s3n:// but I got frustrated by how slow it is at writing files. In particular the phases where it moves the temporary files to their permanent location takes as long as writing the file itself. I can't believe anyone uses this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463p5470.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark LIBLINEAR
It seems that the code isn't managed in github. Can be downloaded from http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/spark/spark-liblinear-1.94.zip It will be easier to track the changes in github. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, May 12, 2014 at 7:53 AM, Xiangrui Meng men...@gmail.com wrote: Hi Chieh-Yen, Great to see the Spark implementation of LIBLINEAR! We will definitely consider adding a wrapper in MLlib to support it. Is the source code on github? Deb, Spark LIBLINEAR uses BSD license, which is compatible with Apache. Best, Xiangrui On Sun, May 11, 2014 at 10:29 AM, Debasish Das debasish.da...@gmail.com wrote: Hello Prof. Lin, Awesome news ! I am curious if you have any benchmarks comparing C++ MPI with Scala Spark liblinear implementations... Is Spark Liblinear apache licensed or there are any specific restrictions on using it ? Except using native blas libraries (which each user has to manage by pulling in their best proprietary BLAS package), all Spark code is Apache licensed. Thanks. Deb On Sun, May 11, 2014 at 3:01 AM, DB Tsai dbt...@stanford.edu wrote: Dear Prof. Lin, Interesting! We had an implementation of L-BFGS in Spark and already merged in the upstream now. We read your paper comparing TRON and OWL-QN for logistic regression with L1 (http://www.csie.ntu.edu.tw/~cjlin/papers/l1.pdf), but it seems that it's not in the distributed setup. Will be very interesting to know the L2 logistic regression benchmark result in Spark with your TRON optimizer and the L-BFGS optimizer against different datasets (sparse, dense, and wide, etc). I'll try your TRON out soon. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, May 11, 2014 at 1:49 AM, Chieh-Yen r01944...@csie.ntu.edu.tw wrote: Dear all, Recently we released a distributed extension of LIBLINEAR at http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/ Currently, TRON for logistic regression and L2-loss SVM is supported. We provided both MPI and Spark implementations. This is very preliminary so your comments are very welcome. Thanks, Chieh-Yen
Re: Bug when zip with longs and too many partitions?
I've discovered that it was noticed a year ago that RDD zip() does not work when the number of partitions does not evenly divide the total number of elements in the RDD: https://groups.google.com/forum/#!msg/spark-users/demrmjHFnoc/Ek3ijiXHr2MJ I will enter a JIRA ticket just as soon as the ASF Jira system will let me reset my password. On Sunday, May 11, 2014 4:40 AM, Michael Malak michaelma...@yahoo.com wrote: Is this a bug? scala sc.parallelize(1 to 2,4).zip(sc.parallelize(11 to 12,4)).collect res0: Array[(Int, Int)] = Array((1,11), (2,12)) scala sc.parallelize(1L to 2L,4).zip(sc.parallelize(11 to 12,4)).collect res1: Array[(Long, Int)] = Array((2,11))
Re: build shark(hadoop CDH5) on hadoop2.0.0 CDH4
Hi Why I always confront remoting error: akka.remote.remoteTransportException and java.util.concurrent.timeoutException? Best Regards, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/build-shark-hadoop-CDH5-on-hadoop2-0-0-CDH4-tp5574p5629.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: pySpark memory usage
Thanks, Aaron, this looks like a good solution! Will be trying it out shortly. I noticed that the S3 exception seem to occur more frequently when the box is swapping. Why is the box swapping? combineByKey seems to make the assumption that it can fit an entire partition in memory when doing the combineLocally step. I'm going to try to break this apart but will need some sort of heuristic options include looking at memory usage via the resource module and trying to keep below 'spark.executor.memory', or using batchSize to limit the number of entries in the dictionary. Let me know if you have any opinions. On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote: I'd just like to update this thread by pointing to the PR based on our initial design: https://github.com/apache/spark/pull/640 This solution is a little more general and avoids catching IOException altogether. Long live exception propagation! On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jim, This IOException thing is a general issue that we need to fix and your observation is spot-in. There is actually a JIRA for it here I created a few days ago: https://issues.apache.org/jira/browse/SPARK-1579 Aaron is assigned on that one but not actively working on it, so we'd welcome a PR from you on this if you are interested. The first thought we had was to set a volatile flag when the reader sees an exception (indicating there was a failure in the task) and avoid swallowing the IOException in the writer if this happens. But I think there is a race here where the writer sees the error first before the reader knows what is going on. Anyways maybe if you have a simpler solution you could sketch it out in the JIRA and we could talk over there. The current proposal in the JIRA is somewhat complicated... - Patrick On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote: FYI, it looks like this stdin writer to Python finished early error was caused by a break in the connection to S3, from which the data was being pulled. A recent commit to PythonRDD noted that the current exception catching can potentially mask an exception for the data source, and that is indeed what I see happening. The underlying libraries (jets3t and httpclient) do have retry capabilities, but I don't see a great way of setting them through Spark code. Instead I added the patch below which kills the worker on the exception. This allows me to completely load the data source after a few worker retries. Unfortunately, java.net.SocketException is the same error that is sometimes expected from the client when using methods like take(). One approach around this conflation is to create a new locally scoped exception class, eg. WriterException, catch java.net.SocketException during output writing, then re-throw the new exception. The worker thread could then distinguish between the reasons java.net.SocketException might be thrown. Perhaps there is a more elegant way to do this in Scala, though? Let me know if I should open a ticket or discuss this on the developers list instead. Best, Jim diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0d71fdb..f31158c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag]( readerException = e Try(worker.shutdownOutput()) // kill Python worker process + case e: java.net.SocketException = + // This can happen if a connection to the datasource, eg S3, resets + // or is otherwise broken +readerException = e +Try(worker.shutdownOutput()) // kill Python worker process + case e: IOException = // This can happen for legitimate reasons if the Python code stops returning data // before we are done passing elements through, e.g., for take(). Just log a message to On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo jim.bl...@gmail.com wrote: This dataset is uncompressed text at ~54GB. stats() returns (count: 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min: 343) On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Okay, thanks. Do you have any info on how large your records and data file are? I'd like to reproduce and fix this. Matei On Apr 9, 2014, at 3:52 PM, Jim Blomo jim.bl...@gmail.com wrote: Hi Matei, thanks for working with me to find these issues. To summarize, the issues I've seen are: 0.9.0: - https://issues.apache.org/jira/browse/SPARK-1323 SNAPSHOT 2014-03-18: - When persist() used and batchSize=1, java.lang.OutOfMemoryError: Java heap space. To me this indicates a memory
Re: Average of each RDD in Stream
Use DStream.foreachRDD to do an operation on the final RDD of every batch. val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1 + b._1, a._2 + b._2) } sumandcount.foreachRDD { rdd = val first: (Double, Int) = rdd.take(1) ; ... } DStream.reduce creates DStream whose RDDs have just one tuple each. The rdd.take(1) above gets that one tuple. However note that there is a corner case in this approach. If in a particular batch, there is not data, then the rdd will have zero elements (no data, nothing to reduce). So you have to take that into account (maybe do a rdd.collect(), check the size, and then get the first / only element). TD On Wed, May 7, 2014 at 7:59 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, I use the following code for calculating average. The problem is that the reduce operation return a DStream here and not a tuple as it normally does without Streaming. So how can we get the sum and the count from the DStream. Can we cast it to tuple? val numbers = ssc.textFileStream(args(1)) val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1 + b._1, a._2 + b._2) } sumandcount.print() Regards, Laeeq
Re: Proper way to stop Spark stream processing
Since you are using the latest Spark code and not Spark 0.9.1 (guessed from the log messages), you can actually do graceful shutdown of a streaming context. This ensures that the receivers are properly stopped and all received data is processed and then the system terminates (stop() stays blocked until then. See other variations of streamingContext.stop(). TD On Mon, May 12, 2014 at 2:49 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hello, I am trying to implement something like process a stream for N seconds, then return a result with Spark Streaming (built from git head). My approach (which is probably not very elegant) is val ssc = new StreamingContext(...) ssc.start() future { Thread.sleep(Seconds(N)) ssc.stop(true) } ssc.awaitTermination() and in fact, this stops the stream processing. However, I get the following error messages: 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to localhost: 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found (where localhost: is the source I am reading the stream from). This doesn't actually seem like the proper way to do it. Can anyone point me to how to implement stop after N seconds without these error messages? Thanks Tobias
Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.
Jacob Gerard - You might find the link below useful: http://rrati.github.io/blog/2014/05/07/apache-hadoop-plus-docker-plus-fedora-running-images/ For non-reverse-dns apps, NAT is your friend. Cheers, Tim - Original Message - From: Jacob Eisinger jeis...@us.ibm.com To: user@spark.apache.org Sent: Tuesday, May 6, 2014 8:30:23 AM Subject: Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs. Howdy, You might find the discussion Andrew and I have been having about Docker and network security [1] applicable. Also, I posted an answer [2] to your stackoverflow question. [1] http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-driver-interacting-with-Workers-in-YARN-mode-firewall-blocking-communication-tp5237p5441.html [2] http://stackoverflow.com/questions/23410505/how-to-run-hdfs-cluster-without-dns/23495100#23495100 Jacob D. Eisinger IBM Emerging Technologies jeis...@us.ibm.com - (512) 286-6075 Gerard Maas ---05/05/2014 04:18:08 PM---Hi Benjamin, Yes, we initially used a modified version of the AmpLabs docker scripts From: Gerard Maas gerard.m...@gmail.com To: user@spark.apache.org Date: 05/05/2014 04:18 PM Subject: Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs. Hi Benjamin, Yes, we initially used a modified version of the AmpLabs docker scripts [1]. The amplab docker images are a good starting point. One of the biggest hurdles has been HDFS, which requires reverse-DNS and I didn't want to go the dnsmasq route to keep the containers relatively simple to use without the need of external scripts. Ended up running a 1-node setup nnode+dnode. I'm still looking for a better solution for HDFS [2] Our usecase using docker is to easily create local dev environments both for development and for automated functional testing (using cucumber). My aim is to strongly reduce the time of the develop-deploy-test cycle. That also means that we run the minimum number of instances required to have a functionally working setup. E.g. 1 Zookeeper, 1 Kafka broker, ... For the actual cluster deployment we have Chef-based devops toolchain that put things in place on public cloud providers. Personally, I think Docker rocks and would like to replace those complex cookbooks with Dockerfiles once the technology is mature enough. -greetz, Gerard. [1] https://github.com/amplab/docker-scripts [2] http://stackoverflow.com/questions/23410505/how-to-run-hdfs-cluster-without-dns On Mon, May 5, 2014 at 11:00 PM, Benjamin bboui...@gmail.com wrote: Hi, Before considering running on Mesos, did you try to submit the application on Spark deployed without Mesos on Docker containers ? Currently investigating this idea to deploy quickly a complete set of clusters with Docker, I'm interested by your findings on sharing the settings of Kafka and Zookeeper across nodes. How many broker and zookeeper do you use ? Regards, On Mon, May 5, 2014 at 10:11 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi all, I'm currently working on creating a set of docker images to facilitate local development with Spark/streaming on Mesos (+zk, hdfs, kafka) After solving the initial hurdles to get things working together in docker containers, now everything seems to start-up correctly and the mesos UI shows slaves as they are started. I'm trying to submit a job from IntelliJ and the jobs submissions seem to get lost in Mesos translation. The logs are not helping me to figure out what's wrong, so I'm posting them here in the hope that they can ring a bell and somebdoy could provide me a hint on what's wrong/missing with my setup. DRIVER (IntelliJ running a Job.scala main) 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for SHUFFLE_BLOCK_MANAGER 14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than 1399319251962 14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older than 1399319251962 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for HTTP_BROADCAST 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for MAP_OUTPUT_TRACKER 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for SPARK_CONTEXT MESOS MASTER I0505 19:52:39.718080 388 master.cpp:690] Registering framework 201405051517-67113388-5050-383-6995 at scheduler(1)@ 127.0.1.1:58115 I0505 19:52:39.718261 388 master.cpp:493] Framework 201405051517-67113388-5050-383-6995 disconnected I0505 19:52:39.718277 389 hierarchical_allocator_process.hpp:332] Added framework 201405051517-67113388-5050-383-6995 I0505 19:52:39.718312 388 master.cpp:520] Giving framework 201405051517-67113388-5050-383-6995 0ns to failover I0505 19:52:39.718431 389 hierarchical_allocator_process.hpp:408]
Re: pySpark memory usage
Hey Jim, unfortunately external spilling is not implemented in Python right now. While it would be possible to update combineByKey to do smarter stuff here, one simple workaround you can try is to launch more map tasks (or more reduce tasks). To set the minimum number of map tasks, you can pass it as a second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 1000)). Matei On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote: Thanks, Aaron, this looks like a good solution! Will be trying it out shortly. I noticed that the S3 exception seem to occur more frequently when the box is swapping. Why is the box swapping? combineByKey seems to make the assumption that it can fit an entire partition in memory when doing the combineLocally step. I'm going to try to break this apart but will need some sort of heuristic options include looking at memory usage via the resource module and trying to keep below 'spark.executor.memory', or using batchSize to limit the number of entries in the dictionary. Let me know if you have any opinions. On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote: I'd just like to update this thread by pointing to the PR based on our initial design: https://github.com/apache/spark/pull/640 This solution is a little more general and avoids catching IOException altogether. Long live exception propagation! On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jim, This IOException thing is a general issue that we need to fix and your observation is spot-in. There is actually a JIRA for it here I created a few days ago: https://issues.apache.org/jira/browse/SPARK-1579 Aaron is assigned on that one but not actively working on it, so we'd welcome a PR from you on this if you are interested. The first thought we had was to set a volatile flag when the reader sees an exception (indicating there was a failure in the task) and avoid swallowing the IOException in the writer if this happens. But I think there is a race here where the writer sees the error first before the reader knows what is going on. Anyways maybe if you have a simpler solution you could sketch it out in the JIRA and we could talk over there. The current proposal in the JIRA is somewhat complicated... - Patrick On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote: FYI, it looks like this stdin writer to Python finished early error was caused by a break in the connection to S3, from which the data was being pulled. A recent commit to PythonRDD noted that the current exception catching can potentially mask an exception for the data source, and that is indeed what I see happening. The underlying libraries (jets3t and httpclient) do have retry capabilities, but I don't see a great way of setting them through Spark code. Instead I added the patch below which kills the worker on the exception. This allows me to completely load the data source after a few worker retries. Unfortunately, java.net.SocketException is the same error that is sometimes expected from the client when using methods like take(). One approach around this conflation is to create a new locally scoped exception class, eg. WriterException, catch java.net.SocketException during output writing, then re-throw the new exception. The worker thread could then distinguish between the reasons java.net.SocketException might be thrown. Perhaps there is a more elegant way to do this in Scala, though? Let me know if I should open a ticket or discuss this on the developers list instead. Best, Jim diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0d71fdb..f31158c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag]( readerException = e Try(worker.shutdownOutput()) // kill Python worker process + case e: java.net.SocketException = + // This can happen if a connection to the datasource, eg S3, resets + // or is otherwise broken +readerException = e +Try(worker.shutdownOutput()) // kill Python worker process + case e: IOException = // This can happen for legitimate reasons if the Python code stops returning data // before we are done passing elements through, e.g., for take(). Just log a message to On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo jim.bl...@gmail.com wrote: This dataset is uncompressed text at ~54GB. stats() returns (count: 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min: 343) On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Okay, thanks.
Re: missing method in my slf4j after excluding Spark ZK log4j
Hi, Adrian -- If my memory serves, you need 1.7.7 of the various slf4j modules to avoid that issue. Best. -- Paul — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Mon, May 12, 2014 at 7:51 AM, Adrian Mocanu amoc...@verticalscope.comwrote: Hey guys, I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j dependency and was told that it was gone. However I still find it part of zookeeper imports. This is fine since I exclude it myself in the sbt file, but another issue arises. I wonder if anyone else has run into this. Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its log4j v 1.2.17 because I get missing method error: java.lang.NoSuchMethodError: org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at scala.Option.map(Option.scala:145) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126) at org.apache.spark.SparkContext.init(SparkContext.scala:139) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76) ... Is there a way to find out what versions of slf4j I need to make it work with log4j 1.2.17? -Adrian