Re: Cassandra examples don't work for me
Yyou need cassandra 1.2.6 for Spark examples — Sent from Mailbox On Thu, Jun 5, 2014 at 12:02 AM, Tim Kellogg t...@2lemetry.com wrote: Hi, I’m following the directions to run the cassandra example “org.apache.spark.examples.CassandraTest” and I get this error Exception in thread main java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:113) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:90) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59) at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:370) at org.apache.spark.examples.CassandraTest$.main(CassandraTest.scala:100) at org.apache.spark.examples.CassandraTest.main(CassandraTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I’m running Cassandra version 2.0.6, and this comes from the spark-1.0.0-bin-hadoop2 distribution package. I am running the example with this commandline: bin/run-example org.apache.spark.examples.CassandraTest localhost localhost 9160 I suspect it’s because I’m running the wrong version of Cassandra, but I can’t find the correct version listed anywhere. I hope this is an easy issue to address. Much thanks, Tim
Re: Logistic Regression MLLib Slow
Hi Krishna, Also, the default optimizer with SGD converges really slow. If you are willing to write scala code, there is a full working example for training Logistic Regression with L-BFGS (a quasi-Newton method) in scala. It converges a way faster than SGD. See http://spark.apache.org/docs/latest/mllib-optimization.html for detail. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Jun 4, 2014 at 7:56 PM, Xiangrui Meng men...@gmail.com wrote: Hi Krishna, Specifying executor memory in local mode has no effect, because all of the threads run inside the same JVM. You can either try --driver-memory 60g or start a standalone server. Best, Xiangrui On Wed, Jun 4, 2014 at 7:28 PM, Xiangrui Meng men...@gmail.com wrote: 80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't take that long, even on a single executor. Besides what Matei suggested, could you also verify the executor memory in http://localhost:4040 in the Executors tab. It is very likely the executors do not have enough memory. In that case, caching may be slower than reading directly from disk. -Xiangrui On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Ah, is the file gzipped by any chance? We can’t decompress gzipped files in parallel so they get processed by a single task. It may also be worth looking at the application UI (http://localhost:4040) to see 1) whether all the data fits in memory in the Storage tab (maybe it somehow becomes larger, though it seems unlikely that it would exceed 20 GB) and 2) how many parallel tasks run in each iteration. Matei On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote: I am using the MLLib one (LogisticRegressionWithSGD) with PySpark. I am running to only 10 iterations. The MLLib version of logistic regression doesn't seem to use all the cores on my machine. Regards, Krishna On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you using the logistic_regression.py in examples/src/main/python or examples/src/main/python/mllib? The first one is an example of writing logistic regression by hand and won’t be as efficient as the MLlib one. I suggest trying the MLlib one. You may also want to check how many iterations it runs — by default I think it runs 100, which may be more than you need. Matei On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the code using all 8 cores with 20G memory using spark-submit --executor-memory 20G --master local[8] logistic_regression.py It seems to take about 3.5 hours without caching and over 5 hours with caching. What is the recommended use for Spark on a beefy single machine? Any suggestions will help! Regards, Krishna Code sample: - # Dataset d = sys.argv[1] data = sc.textFile(d) # Load and parse the data # -- def parsePoint(line): values = [float(x) for x in line.split(',')] return LabeledPoint(values[0], values[1:]) _parsedData = data.map(parsePoint) parsedData = _parsedData.cache() results = {} # Spark # -- start_time = time.time() # Build the gl_model niters = 10 spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) # Evaluate the gl_model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, spark_model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
Re: Logistic Regression MLLib Slow
Hi Krishna, It should work, and we use it in production with great success. However, the constructor of LogisticRegressionModel is private[mllib], so you have to write your code, and have the package name under org.apache.spark.mllib instead of using scala console. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Jun 4, 2014 at 11:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Does L-BFSG work with spark 1.0? (see code sample below). Eventually, I would like to have L-BFGS working but I was facing an issue where 10 passes over the data was taking forever. I ran spark in standalone mode and the performance is much better! Regards, Krishna I am using http://spark.apache.org/docs/latest/mllib-optimization.html scala val model = new LogisticRegressionModel( Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)), weightsWithIntercept(weightsWithIntercept.size - 1)) val model = new LogisticRegressionModel( | Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)), | weightsWithIntercept(weightsWithIntercept.size - 1)) console:20: error: constructor LogisticRegressionModel in class LogisticRegressionModel cannot be accessed in class $iwC val model = new LogisticRegressionModel( Based on the documentation, it would seem like LogisticRegressionModel doesn't have a constructor: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionModel LogisticRegression *does* have a constructor: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithSGD On Wed, Jun 4, 2014 at 11:33 PM, DB Tsai dbt...@stanford.edu wrote: Hi Krishna, Also, the default optimizer with SGD converges really slow. If you are willing to write scala code, there is a full working example for training Logistic Regression with L-BFGS (a quasi-Newton method) in scala. It converges a way faster than SGD. See http://spark.apache.org/docs/latest/mllib-optimization.html for detail. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Jun 4, 2014 at 7:56 PM, Xiangrui Meng men...@gmail.com wrote: Hi Krishna, Specifying executor memory in local mode has no effect, because all of the threads run inside the same JVM. You can either try --driver-memory 60g or start a standalone server. Best, Xiangrui On Wed, Jun 4, 2014 at 7:28 PM, Xiangrui Meng men...@gmail.com wrote: 80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't take that long, even on a single executor. Besides what Matei suggested, could you also verify the executor memory in http://localhost:4040 in the Executors tab. It is very likely the executors do not have enough memory. In that case, caching may be slower than reading directly from disk. -Xiangrui On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Ah, is the file gzipped by any chance? We can’t decompress gzipped files in parallel so they get processed by a single task. It may also be worth looking at the application UI (http://localhost:4040) to see 1) whether all the data fits in memory in the Storage tab (maybe it somehow becomes larger, though it seems unlikely that it would exceed 20 GB) and 2) how many parallel tasks run in each iteration. Matei On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote: I am using the MLLib one (LogisticRegressionWithSGD) with PySpark. I am running to only 10 iterations. The MLLib version of logistic regression doesn't seem to use all the cores on my machine. Regards, Krishna On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you using the logistic_regression.py in examples/src/main/python or examples/src/main/python/mllib? The first one is an example of writing logistic regression by hand and won’t be as efficient as the MLlib one. I suggest trying the MLlib one. You may also want to check how many iterations it runs — by default I think it runs 100, which may be more than you need. Matei On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the
Re: Join : Giving incorrect result
Sorry for replying late. It was night here. Lian/Matei, Here is the code snippet - sparkConf.set(spark.executor.memory, 10g) sparkConf.set(spark.cores.max, 5) val sc = new SparkContext(sparkConf) val accId2LocRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_, 0, ',', true)) val accId2DemoRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_, 0, ',', true)) val joinedRDD = accId2LocRDD.join(accId2DemoRDD) def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = { val splits = line.split(delimit) if (splits.length = 1) { (null, null) } else if (retFullLine) { (splits(keyIndex), line) } else{ (splits(keyIndex), splits(splits.length-keyIndex-1)) } } Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records. We have done some more experiments - 1) Running cogroup instead of join - it also gives incorrect count. 2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count. 3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations. I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine. Regards, Ajay On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com wrote: If this isn’t the problem, it would be great if you can post the code for the program. Matei On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote: Maybe your two workers have different assembly jar files? I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results. On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
Re: Can this be done in map-reduce technique (in parallel)
Hi Cheng, Sorry Again. In this method, i see that the values for a - positions.iterator b - positions.iterator always remain the same. I tried to do a b - positions.iterator.next, it throws an error: value filter is not a member of (Double, Double) Is there something I am missing out here? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-handled-in-map-reduce-using-RDDs-tp6905p7033.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Re: mismatched hdfs protocol
ok, i see i imported wrong jar files which only work well on default hadoop version 2014-06-05 bluejoe2008 From: prabeesh k Date: 2014-06-05 16:14 To: user Subject: Re: Re: mismatched hdfs protocol If you are not setting the Spark hadoop version, Spark built using default hadoop version(1.0.4). Before importing Spark-1.0.0 libraries , build Spark using SPARK_HADOOP_VERSION=2.4.0 sbt/sbt assembly command. On Thu, Jun 5, 2014 at 12:28 PM, bluejoe2008 bluejoe2...@gmail.com wrote: thank you! i am developping a java project in Eclipse IDE on Windows in which spark 1.0.0 libraries are imported and now i want to open HDFS files as input the hadoop version of HDFS is 2.4.0 2014-06-05 bluejoe2008 From: prabeesh k Date: 2014-06-05 13:23 To: user Subject: Re: mismatched hdfs protocol For building Spark for particular version of Hadoop Refer http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html On Thu, Jun 5, 2014 at 8:14 AM, Koert Kuipers ko...@tresata.com wrote: you have to build spark against the version of hadoop your are using On Wed, Jun 4, 2014 at 10:25 PM, bluejoe2008 bluejoe2...@gmail.com wrote: hi, all when my spark program accessed hdfs files an error happened: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4 it seems the client was trying to connect hadoop2 via an old hadoop protocol so my question is: how to specify the version of hadoop on connection? thank you! bluejoe 2014-06-05
Re: Can this be done in map-reduce technique (in parallel)
Lakshmi, this is orthogonal to your question, but in case it's useful. It sounds like you're trying to determine the home location of a user, or something similar. If that's the problem statement, the data pattern may suggest a far more computationally efficient approach. For example, first map all (lat,long) pairs into geocells of a desired resolution (e.g., 10m or 100m), then count occurrences of geocells instead. There are simple libraries to map any (lat,long) pairs into a geocell (string) ID very efficiently. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Wed, Jun 4, 2014 at 3:49 AM, lmk lakshmi.muralikrish...@gmail.com wrote: Hi, I am a new spark user. Pls let me know how to handle the following scenario: I have a data set with the following fields: 1. DeviceId 2. latitude 3. longitude 4. ip address 5. Datetime 6. Mobile application name With the above data, I would like to perform the following steps: 1. Collect all lat and lon for each ipaddress (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) 2. For each IP, 1.Find the distance between each lat and lon coordinate pair and all the other pairs under the same IP 2.Select those coordinates whose distances fall under a specific threshold (say 100m) 3.Find the coordinate pair with the maximum occurrences In this case, how can I iterate and compare each coordinate pair with all the other pairs? Can this be done in a distributed manner, as this data set is going to have a few million records? Can we do this in map/reduce commands? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can't seem to link external/twitter classes from my own app
I shan't be far. I'm committed now. Spark and I are going to have a very interesting future together, but hopefully future messages will be about the algorithms and modules, and less how do I run make?. I suspect doing this at the exact moment of the 0.9 - 1.0.0 transition hasn't helped me. (I literally had the documentation changing on me between page reloads last thursday, after days of studying the old version. I thought I was going crazy until the new version number appeared in the corner and the release email went out.) The last time I entered into a serious relationship with a piece of software like this was with a little company called Cognos. :-) And then Microsoft asked us for some advice about a thing called OLAP Server they were making. (But I don't think they listened as hard as they should have.) Oh, the things I'm going to do with Spark! If it hadn't existed, I would have had to make it. (My honors thesis was in distributed computing. I once created an incrementally compiled language that could pause execution, decompile, move to another machine, recompile, restore state and continue while preserving all active network connections. discuss.) On Thu, Jun 5, 2014 at 5:46 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Great - well we do hope we hear from you, since the user list is for interesting success stories and anecdotes, as well as blog posts etc too :) On Thu, Jun 5, 2014 at 9:40 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Oh. Yes of course. *facepalm* I'm sure I typed that at first, but at some point my fingers decided to grammar-check me. Stupid fingers. I wonder what sbt assemble does? (apart from error) It certainly takes a while to do it. Thanks for the maven offer, but I'm not scheduled to learn that until after Scala, streaming, graphx, mllib, HDFS, sbt, Python, and yarn. I'll probably need to know it for yarn, but I'm really hoping to put it off until then. (fortunately I already knew about linux, AWS, eclipse, git, java, distributed programming and ssh keyfiles, or I would have been in real trouble) Ha! OK, that worked for the Kafka project... fails on the other old 0.9 Twitter project, but who cares... now for mine HAHA! YES!! Oh thank you! I have the equivalent of hello world that uses one external library! Now the compiler and I can have a _proper_ conversation. Hopefully you won't be hearing from me for a while. On Thu, Jun 5, 2014 at 3:06 PM, Nick Pentreath nick.pentre...@gmail.com wrote: The magic incantation is sbt assembly (not assemble). Actually I find maven with their assembly plugins to be very easy (mvn package). I can send a Pom.xml for a skeleton project if you need — Sent from Mailbox https://www.dropbox.com/mailbox On Thu, Jun 5, 2014 at 6:59 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Hmm.. That's not working so well for me. First, I needed to add a project/plugin.sbt file with the contents: addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.4) Before 'sbt/sbt assemble' worked at all. And I'm not sure about that version number, but 0.9.1 isn't working much better and 11.4 is the latest one recommended by the sbt project site. Where did you get your version from? Second, even when I do get it to build a .jar, spark-submit is still telling me the external.twitter library is missing. I tried using your github project as-is, but it also complained about the missing plugin.. I'm trying it with various versions now to see if I can get that working, even though I don't know anything about kafka. Hmm, and no. Here's what I get: [info] Set current project to Simple Project (in build file:/home/ubuntu/spark-1.0.0/SparkKafka/) [error] Not a valid command: assemble [error] Not a valid project ID: assemble [error] Expected ':' (if selecting a configuration) [error] Not a valid key: assemble (similar: assembly, assemblyJarName, assemblyDirectory) [error] assemble [error] I also found this project which seemed to be exactly what I was after: https://github.com/prabeesh/SparkTwitterAnalysis ...but it was for Spark 0.9, and though I updated all the version references to 1.0.0, that one doesn't work either. I can't even get it to build. *sigh* Is it going to be easier to just copy the external/ source code into my own project? Because I will... especially if creating Uberjars takes this long every... single... time... On Thu, Jun 5, 2014 at 8:52 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Thanks Patrick! Uberjars. Cool. I'd actually heard of them. And thanks for the link to the example! I shall work through that today. I'm still learning sbt and it's many options... the last new framework I learned was node.js, and I think I've been rather spoiled by npm. At least it's not maven. Please, oh please don't make me learn maven too. (The only people who seem to like it have Software Stockholm Syndrome: I know maven kidnapped me and
Re: Unable to run a Standalone job
try sbt clean command before build the app. or delete .ivy2 ans .sbt folders(not a good methode). Then try to rebuild the project. On Thu, Jun 5, 2014 at 11:45 AM, Sean Owen so...@cloudera.com wrote: I think this is SPARK-1949 again: https://github.com/apache/spark/pull/906 I think this change fixed this issue for a few people using the SBT build, worth committing? On Thu, Jun 5, 2014 at 6:40 AM, Shrikar archak shrika...@gmail.com wrote: Hi All, Now that the Spark Version 1.0.0 is release there should not be any problem with the local jars. Shrikars-MacBook-Pro:SimpleJob shrikar$ cat simple.sbt name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0, org.apache.spark %% spark-streaming % 1.0.0) resolvers += Akka Repository at http://repo.akka.io/releases/; I am still having this issue [error] (run-main) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) Any help would be greatly appreciated. Thanks, Shrikar On Fri, May 23, 2014 at 3:58 PM, Shrikar archak shrika...@gmail.com wrote: Still the same error no change Thanks, Shrikar On Fri, May 23, 2014 at 2:38 PM, Jacek Laskowski ja...@japila.pl wrote: Hi Shrikar, How did you build Spark 1.0.0-SNAPSHOT on your machine? My understanding is that `sbt publishLocal` is not enough and you really need `sbt assembly` instead. Give it a try and report back. As to your build.sbt, upgrade Scala to 2.10.4 and org.apache.spark %% spark-streaming % 1.0.0-SNAPSHOT only that will pull down spark-core as a transitive dep. The resolver for Akka Repository is not needed. Your build.sbt should really look as follows: name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0-SNAPSHOT Jacek On Thu, May 22, 2014 at 11:27 PM, Shrikar archak shrika...@gmail.com wrote: Hi All, I am trying to run the network count example as a seperate standalone job and running into some issues. Environment: 1) Mac Mavericks 2) Latest spark repo from Github. I have a structure like this Shrikars-MacBook-Pro:SimpleJob shrikar$ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/NetworkWordCount.scala ./src/main/scala/SimpleApp.scala.bk simple.sbt name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0-SNAPSHOT, org.apache.spark %% spark-streaming % 1.0.0-SNAPSHOT) resolvers += Akka Repository at http://repo.akka.io/releases/; I am able to run the SimpleApp which is mentioned in the doc but when I try to run the NetworkWordCount app I get error like this am I missing something? [info] Running com.shrikar.sparkapps.NetworkWordCount 14/05/22 14:26:47 INFO spark.SecurityManager: Changing view acls to: shrikar 14/05/22 14:26:47 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(shrikar) 14/05/22 14:26:48 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/05/22 14:26:48 INFO Remoting: Starting remoting 14/05/22 14:26:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.10.88:49963] 14/05/22 14:26:48 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.10.88:49963] 14/05/22 14:26:48 INFO spark.SparkEnv: Registering MapOutputTracker 14/05/22 14:26:48 INFO spark.SparkEnv: Registering BlockManagerMaster 14/05/22 14:26:48 INFO storage.DiskBlockManager: Created local directory at /var/folders/r2/mbj08pb55n5d_9p8588xk5b0gn/T/spark-local-20140522142648-0a14 14/05/22 14:26:48 INFO storage.MemoryStore: MemoryStore started with capacity 911.6 MB. 14/05/22 14:26:48 INFO network.ConnectionManager: Bound socket to port 49964 with id = ConnectionManagerId(192.168.10.88,49964) 14/05/22 14:26:48 INFO storage.BlockManagerMaster: Trying to register BlockManager
Re: Error related to serialisation in spark streaming
Thanx a lot for your reply. I can see kryo serialiser in the UI. I have 1 another query : I wanted to know the meaning of the following log message when running a spark streaming job : [spark-akka.actor.default-dispatcher-18] INFO org.apache.spark.streaming.scheduler.JobScheduler - Total delay: 5.432 s for time 1401870454500 ms (execution: 0.593 s) According to my understanding, total delay here means total end-to-end delay which is here 5.432 sec. What is the meaning of execution : 0.593 ?? Is it the time taken for executing this particular query ? PS : I am running a streaming job over a window of 5 mins and quering every 1.5 sec. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p7039.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Native library can not be loaded when using Mllib PCA
Hi, We're using Mllib (1.0.0 release version) on a k-means clustering problem. We want to reduce the matrix column size before send the points to k-means solver. It works on my mac with the local mode: spark-test-run-assembly-1.0.jar contains my application code, com.github.fommil, netlib code and netlib-native*.so files (include jnilib and dll files) spark-submit --class test.TestMllibPCA --master local[4] --executor-memory 3g --driver-memory 3g --driver-class-path /data/user/dump/spark-test-run-assembly-1.0.jar /data/user/dump/spark-test-run-assembly-1.0.jar /data/user/dump/user_fav_2014_04_09.csv.head1w But if --driver-class-path removed, the warn message appears: 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK or set SPARK_CLASSPATH=/data/user/dump/spark-test-run-assembly-1.0.jar can also solve the problem. The matrix contain sparse data with rows: 6778, columns: 2487 and the time consume of calculating PCA is 10s and 47s respectively which infers the native library works well. Then I want to test it on a spark standalone cluster(on CentOS), but it failed again. After change JDK logging level to FINEST, got the message: 14/06/05 16:19:15 INFO JniLoader: JNI LIB = netlib-native_system-linux-x86_64.so 14/06/05 16:19:15 INFO JniLoader: extracting jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_system-linux-x86_64.so to /tmp/jniloader6648403281987654682netlib-native_system-linux-x86_64.so 14/06/05 16:19:15 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 14/06/05 16:19:15 INFO JniLoader: JNI LIB = netlib-native_ref-linux-x86_64.so 14/06/05 16:19:15 INFO JniLoader: extracting jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_ref-linux-x86_64.so to /tmp/jniloader2298588627398263902netlib-native_ref-linux-x86_64.so 14/06/05 16:19:16 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK 14/06/05 16:19:16 INFO LAPACK: Implementation provided by class com.github.fommil.netlib.F2jLAPACK The libgfortran ,atlas, blas, lapack and arpack are all installed and all of the .so files are located under /usr/lib64, spark.executor.extraLibraryPath is set to /usr/lib64 in conf/spark-defaults.conf but none of them works. I tried add --jars /data/user/dump/spark-test-run-assembly-1.0.jar but no good news. What should I try next? Is the native library need to be visible for driver and executor both? In local mode the problem seems to be a classpath problem, but for standalone and yarn mode it get more complex. A detail document is really helpful. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to shut down Spark Streaming with Kafka properly?
Hi, I am trying to use Spark Streaming with Kafka, which works like a charm -- except for shutdown. When I run my program with sbt run-main, sbt will never exit, because there are two non-daemon threads left that don't die. I created a minimal example at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala. It starts a StreamingContext and does nothing more than connecting to a Kafka server and printing what it receives. Using the `future { ... }` construct, I shut down the StreamingContext after some seconds and then print the difference between the threads at start time and at end time. The output can be found at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1. There are a number of threads remaining that will prevent sbt from exiting. When I replace `KafkaUtils.createStream(...)` with a call that does exactly the same, except that it calls `consumerConnector.shutdown()` in `KafkaReceiver.onStop()` (which it should, IMO), the output is as shown at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2. Does anyone have *any* idea what is going on here and why the program doesn't shut down properly? The behavior is the same with both kafka 0.8.0 and 0.8.1.1, by the way. Thanks Tobias
Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver
Hi, I have written my own custom Spark streaming code which connects to Kafka server and fetch data. I have tested the code on local mode and it is working fine. But when I am executing the same code on YARN mode, I am getting KafkaReceiver class not found exception. I am providing the Spark Kafka jar in the classpath and ensured that the path is correct for all the nodes in my cluster. I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10 node cluster) in the YARN cluster. I am using the following command to run my code on YARN mode: *SPARK_YARN_MODE=true SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/ SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic NewTestTable 1* Below is the error message I am getting: *14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set 2.0 with 1 tasks14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 70 on executor 2: manny6.musigma.com http://manny6.musigma.com (PROCESS_LOCAL)14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2 ms14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0)14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundExceptionjava.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiverat java.net.URLClassLoader$1.run(URLClassLoader.java:202)at java.security.AccessController.doPrivileged(Native Method)at java.net.URLClassLoader.findClass(URLClassLoader.java:190)at java.lang.ClassLoader.loadClass(ClassLoader.java:306)at java.lang.ClassLoader.loadClass(ClassLoader.java:247)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:247)at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597)at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:396)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at
Re: Can this be done in map-reduce technique (in parallel)
Hi Cheng, Thanks a lot. That solved my problem. Thanks again for the quick response and solution. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-handled-in-map-reduce-using-RDDs-tp6905p7047.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark not working with mesos
Hi Ajatix. Yes the HADOOP_HOME is set on the nodes and i did update the bash. As I said, adding MESOS_HADOOP_HOME did not work. But what is causing the original error : Java.lang.Error: java.io.IOException: failure to login ? -- Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-not-working-with-mesos-tp6806p7048.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Serialization problem in Spark
Hi, I am trying to do something like following in Spark: JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () { @Override public Tuple2byte[], MyObject call(Tuple2ImmutableBytesWritable, Result immutableBytesWritableResultTuple2) throws Exception { return new Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(), MyClass.get(immutableBytesWritableResultTuple2._2)); } }); eventRDD.foreach(new VoidFunctionTuple2byte[], Event() { @Override public void call(Tuple2byte[], Event eventTuple2) throws Exception { processForEvent(eventTuple2._2); } }); processForEvent() function flow contains some processing and ultimately writing to HBase Table. But I am getting serialisation issues with Hadoop and HBase inbuilt classes. How do I solve this ? Does using Kyro Serialisation help in this case ? Thanks, -Vibhor
Problem with serialization and deserialization
hi, I have a JTree. I want to serialize it using sc.saveAsObjectFile(path). I could save it in some location. The real problem is that when I deserialize it back using sc.objectFile(), I am not getting the jtree. Can anyone please help me on this.. Thanks
Re: Problem with serialization and deserialization
Dear Aneesh, Your particular use case of using Swing GUI components with Spark is a bit unclear to me. Assuming that you want Spark to operate on a tree object, you could use an implementation of the TreeModel ( http://docs.oracle.com/javase/8/docs/api/javax/swing/tree/DefaultTreeModel.html ) used internally by the JTree. This class is serialisable, whereas JTree is not. This might be one of the causes of your problems when trying to serialise JTree (educated guess). --- Kind regards, Stefan van Wouw On 05 Jun 2014, at 13:47, ANEESH .V.V aneeshnair.ku...@gmail.com wrote: hi, I have a JTree. I want to serialize it using sc.saveAsObjectFile(path). I could save it in some location. The real problem is that when I deserialize it back using sc.objectFile(), I am not getting the jtree. Can anyone please help me on this.. Thanks
Re: Better line number hints for logging?
On Wed, Jun 4, 2014 at 10:39 PM, Matei Zaharia matei.zaha...@gmail.com wrote: That’s a good idea too, maybe we can change CallSiteInfo to do that. I've filed an issue: https://issues.apache.org/jira/browse/SPARK-2035 Matei On Jun 4, 2014, at 8:44 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Oh, this would be super useful for us too! Actually wouldn't it be best if you could see the whole call stack on the UI, rather than just one line? (Of course you would have to click to expand it.) On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier jsalvat...@gmail.com wrote: Ok, I will probably open a Jira. On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can use RDD.setName to give it a name. There’s also a creationSite field that is private[spark] — we may want to add a public setter for that later. If the name isn’t enough and you’d like this, please open a JIRA issue for it. Matei On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote: I have created some extension methods for RDDs in RichRecordRDD and these are working exceptionally well for me. However, when looking at the logs, its impossible to tell what's going on because all the line number hints point to RichRecordRDD.scala rather than the code that uses it. For example: INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at map at RichRecordRDD.scala:633), which is now runnable Is there any way set up my extension methods class so that the logs will print a more useful line number?
Re: Serialization problem in Spark
Any inputs on this will be helpful. Thanks, -Vibhor On Thu, Jun 5, 2014 at 3:41 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi, I am trying to do something like following in Spark: JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () { @Override public Tuple2byte[], MyObject call(Tuple2ImmutableBytesWritable, Result immutableBytesWritableResultTuple2) throws Exception { return new Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(), MyClass.get(immutableBytesWritableResultTuple2._2)); } }); eventRDD.foreach(new VoidFunctionTuple2byte[], Event() { @Override public void call(Tuple2byte[], Event eventTuple2) throws Exception { processForEvent(eventTuple2._2); } }); processForEvent() function flow contains some processing and ultimately writing to HBase Table. But I am getting serialisation issues with Hadoop and HBase inbuilt classes. How do I solve this ? Does using Kyro Serialisation help in this case ? Thanks, -Vibhor -- Vibhor Banga Software Development Engineer Flipkart Internet Pvt. Ltd., Bangalore
Re: Spark Streaming not processing file with particular number of entries
The same issue persists in spark-1.0.0 as well (was using 0.9.1 earlier). Any suggestions are welcomed. -- Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7056.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark worker and yarn memory
I am slightly confused about the --executor-memory setting. My yarn cluster has a maximum container memory of 8192MB. When I specify --executor-memory 8G in my spark-shell, no container can be started at all. It only works when I lower the executor memory to 7G. But then, on yarn, I see 2 container per node, using 16G of memory. Then on the spark UI, it shows that each worker has 4GB of memory, rather than 7. Can someone explain the relationship among the numbers I see here? Thanks.
Loading Python libraries into Spark
Hi, I am new to Spark (and almost-new in python!). How can I download and install a Python library in my cluster so I can just import it later? Any help would be much appreciated. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-Python-libraries-into-Spark-tp7059.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
compress in-memory cache?
I have a working set larger than available memory, thus I am hoping to turn on rdd compression so that I can store more in-memory. Strangely it made no difference. The number of cached partitions, fraction cached, and size in memory remain the same. Any ideas? I confirmed that rdd compression wasn't on before and it was on for the second test. scala sc.getConf.getAll foreach println ... (spark.rdd.compress,true) ... I haven't tried lzo vs snappy, but my guess is that either one should provide at least some benefit.. Thanks. -Simon
Re: compress in-memory cache?
Have you set the persistence level of the RDD to MEMORY_ONLY_SER ( http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)? If you're calling cache, the default persistence level is MEMORY_ONLY so that setting will have no impact. On Thu, Jun 5, 2014 at 4:41 PM, Xu (Simon) Chen xche...@gmail.com wrote: I have a working set larger than available memory, thus I am hoping to turn on rdd compression so that I can store more in-memory. Strangely it made no difference. The number of cached partitions, fraction cached, and size in memory remain the same. Any ideas? I confirmed that rdd compression wasn't on before and it was on for the second test. scala sc.getConf.getAll foreach println ... (spark.rdd.compress,true) ... I haven't tried lzo vs snappy, but my guess is that either one should provide at least some benefit.. Thanks. -Simon
Scala By the Bay Developer Conference and Training Registration
Scala by the Bay registration and training is now open! We are assembling a great two-day program for Scala By the Bay www.scalabythebay.org -- the yearly SF Scala developer conference. This year the conference itself is on August 8-9 in Fort Mason, near the Golden Gate bridge, with the Scala training on August 6-7 and Apache Spark training August 11-12. We have key companies using Scala as sponsors and partners, and many great talk submissions already in the pipeline, including Play and Akka. We provide pre-conference training, Scala Foundations, and post-conference training, Fast Track to Spark. Each training is a two day intensive, hands-on course. The training and registration tickets have early bird discounts of about $200 each, so e.g. on the all three package the savings would be $600. Early bird pricing ends on July 2nd. Training capacity is limited to 25 seats in each training session. The keynotes are by Marius Eriksen, Principal Engineer at Twitter, the lead on Finagle and other Twitter Stack components, and Matei Zaharia, CTO of Databricks, the lead on Apache Spark and other Berkeley Stack components. There will be great lunch and dinner presentations and we plan to bring the Off the Grid trucks for awesome food, as well as create other opportunities afforded By the Bay to enjoy our spectacular location and community. We are still looking for more great talk submissions, both full-length and lightning. The CFP is open through Friday the 13th, and the registration fee for the authors of accepted full-length talks will be waived. We will open up voting for registered attendees once we have the majority on hand. The program will be composed based on the votes from the attendees, the reviewers, and the organizers. We hope to see you at Fort Mason in August! Alexy Khrabrov, Jason Swartz, and the Organizing Committee of Scala By the Bay
Re: Unable to run a Standalone job([NOT FOUND ] org.eclipse.jetty.orbit#javax.mail.glassfish;1.4.1.v201005082020)
Hi Prabeesh/ Sean, I tried both the steps you guys mentioned looks like its not able to resolve it. [warn] [NOT FOUND ] org.eclipse.jetty.orbit#javax.transaction;1.1.1.v201105210645!javax.transaction.orbit (131ms) [warn] public: tried [warn] http://repo1.maven.org/maven2/org/eclipse/jetty/orbit/javax.transaction/1.1.1.v201105210645/javax.transaction-1.1.1.v201105210645.orbit [warn] [NOT FOUND ] org.eclipse.jetty.orbit#javax.servlet;3.0.0.v201112011016!javax.servlet.orbit (225ms) [warn] public: tried [warn] http://repo1.maven.org/maven2/org/eclipse/jetty/orbit/javax.servlet/3.0.0.v201112011016/javax.servlet-3.0.0.v201112011016.orbit [warn] [NOT FOUND ] org.eclipse.jetty.orbit#javax.mail.glassfish;1.4.1.v201005082020!javax.mail.glassfish.orbit (214ms) [warn] public: tried [warn] http://repo1.maven.org/maven2/org/eclipse/jetty/orbit/javax.mail.glassfish/1.4.1.v201005082020/javax.mail.glassfish-1.4.1.v201005082020.orbit [warn] [NOT FOUND ] org.eclipse.jetty.orbit#javax.activation;1.1.0.v201105071233!javax.activation.orbit (112ms) [warn] public: tried Thanks, Shrikar On Thu, Jun 5, 2014 at 1:27 AM, prabeesh k prabsma...@gmail.com wrote: try sbt clean command before build the app. or delete .ivy2 ans .sbt folders(not a good methode). Then try to rebuild the project. On Thu, Jun 5, 2014 at 11:45 AM, Sean Owen so...@cloudera.com wrote: I think this is SPARK-1949 again: https://github.com/apache/spark/pull/906 I think this change fixed this issue for a few people using the SBT build, worth committing? On Thu, Jun 5, 2014 at 6:40 AM, Shrikar archak shrika...@gmail.com wrote: Hi All, Now that the Spark Version 1.0.0 is release there should not be any problem with the local jars. Shrikars-MacBook-Pro:SimpleJob shrikar$ cat simple.sbt name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0, org.apache.spark %% spark-streaming % 1.0.0) resolvers += Akka Repository at http://repo.akka.io/releases/; I am still having this issue [error] (run-main) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) Any help would be greatly appreciated. Thanks, Shrikar On Fri, May 23, 2014 at 3:58 PM, Shrikar archak shrika...@gmail.com wrote: Still the same error no change Thanks, Shrikar On Fri, May 23, 2014 at 2:38 PM, Jacek Laskowski ja...@japila.pl wrote: Hi Shrikar, How did you build Spark 1.0.0-SNAPSHOT on your machine? My understanding is that `sbt publishLocal` is not enough and you really need `sbt assembly` instead. Give it a try and report back. As to your build.sbt, upgrade Scala to 2.10.4 and org.apache.spark %% spark-streaming % 1.0.0-SNAPSHOT only that will pull down spark-core as a transitive dep. The resolver for Akka Repository is not needed. Your build.sbt should really look as follows: name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0-SNAPSHOT Jacek On Thu, May 22, 2014 at 11:27 PM, Shrikar archak shrika...@gmail.com wrote: Hi All, I am trying to run the network count example as a seperate standalone job and running into some issues. Environment: 1) Mac Mavericks 2) Latest spark repo from Github. I have a structure like this Shrikars-MacBook-Pro:SimpleJob shrikar$ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/NetworkWordCount.scala ./src/main/scala/SimpleApp.scala.bk simple.sbt name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0-SNAPSHOT, org.apache.spark %% spark-streaming % 1.0.0-SNAPSHOT) resolvers += Akka Repository at http://repo.akka.io/releases/; I am able to run the SimpleApp which is mentioned in the doc but when I try to run the NetworkWordCount app I get error like this am I missing something? [info] Running com.shrikar.sparkapps.NetworkWordCount 14/05/22 14:26:47 INFO spark.SecurityManager:
Re: reuse hadoop code in Spark
Thanks Matei. Using your pointers I can import data frrom HDFS, what I want to do now is something like this in Spark: --- import myown.mapper rdd.map (mapper.map) --- The reason why I want this: myown.mapper is a java class I already developed. I used to run it in Hadoop. It is fairly complex and relies on a lot of utility java classes I wrote. Can I reuse the map function in java and port it into Spark? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Matei Zaharia matei.zaha...@gmail.com To: user@spark.apache.org, Date: 06/04/2014 04:28 PM Subject:Re: reuse hadoop code in Spark Yes, you can write some glue in Spark to call these. Some functions to look at: - SparkContext.hadoopRDD lets you create an input RDD from an existing JobConf configured by Hadoop (including InputFormat, paths, etc) - RDD.mapPartitions lets you operate in all the values on one partition (block) at a time, similar to how Mappers in MapReduce work - PairRDDFunctions.reduceByKey and groupByKey can be used for aggregation. - RDD.pipe() can be used to call out to a script or binary, like Hadoop Streaming. A fair number of people have been running both Java and Hadoop Streaming apps like this. Matei On Jun 4, 2014, at 1:08 PM, Wei Tan w...@us.ibm.com wrote: Hello, I am trying to use spark in such a scenario: I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I wonder if I can reuse the map() functions I already wrote in Hadoop (Java), and use Spark to chain them, mixing the Java map() functions with Spark operators? Another related question, can I use binary as operators, like Hadoop streaming? Thanks! Wei
Re: Native library can not be loaded when using Mllib PCA
For standalone and yarn mode, you need to install native libraries on all nodes. The best solution is installing them to /usr/lib/libblas.so.3 and /usr/lib/liblapack.so.3 . If your matrix is sparse, the native libraries cannot help because they are for dense linear algebra. You can create RDD of sparse rows and try k-means directly, it supports sparse input. -Xiangrui Sent from my iPad On Jun 5, 2014, at 2:36 AM, yangliuyu yangli...@163.com wrote: Hi, We're using Mllib (1.0.0 release version) on a k-means clustering problem. We want to reduce the matrix column size before send the points to k-means solver. It works on my mac with the local mode: spark-test-run-assembly-1.0.jar contains my application code, com.github.fommil, netlib code and netlib-native*.so files (include jnilib and dll files) spark-submit --class test.TestMllibPCA --master local[4] --executor-memory 3g --driver-memory 3g --driver-class-path /data/user/dump/spark-test-run-assembly-1.0.jar /data/user/dump/spark-test-run-assembly-1.0.jar /data/user/dump/user_fav_2014_04_09.csv.head1w But if --driver-class-path removed, the warn message appears: 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK or set SPARK_CLASSPATH=/data/user/dump/spark-test-run-assembly-1.0.jar can also solve the problem. The matrix contain sparse data with rows: 6778, columns: 2487 and the time consume of calculating PCA is 10s and 47s respectively which infers the native library works well. Then I want to test it on a spark standalone cluster(on CentOS), but it failed again. After change JDK logging level to FINEST, got the message: 14/06/05 16:19:15 INFO JniLoader: JNI LIB = netlib-native_system-linux-x86_64.so 14/06/05 16:19:15 INFO JniLoader: extracting jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_system-linux-x86_64.so to /tmp/jniloader6648403281987654682netlib-native_system-linux-x86_64.so 14/06/05 16:19:15 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 14/06/05 16:19:15 INFO JniLoader: JNI LIB = netlib-native_ref-linux-x86_64.so 14/06/05 16:19:15 INFO JniLoader: extracting jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_ref-linux-x86_64.so to /tmp/jniloader2298588627398263902netlib-native_ref-linux-x86_64.so 14/06/05 16:19:16 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK 14/06/05 16:19:16 INFO LAPACK: Implementation provided by class com.github.fommil.netlib.F2jLAPACK The libgfortran ,atlas, blas, lapack and arpack are all installed and all of the .so files are located under /usr/lib64, spark.executor.extraLibraryPath is set to /usr/lib64 in conf/spark-defaults.conf but none of them works. I tried add --jars /data/user/dump/spark-test-run-assembly-1.0.jar but no good news. What should I try next? Is the native library need to be visible for driver and executor both? In local mode the problem seems to be a classpath problem, but for standalone and yarn mode it get more complex. A detail document is really helpful. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: compress in-memory cache?
Thanks.. it works now. -Simon On Thu, Jun 5, 2014 at 10:47 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Have you set the persistence level of the RDD to MEMORY_ONLY_SER ( http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)? If you're calling cache, the default persistence level is MEMORY_ONLY so that setting will have no impact. On Thu, Jun 5, 2014 at 4:41 PM, Xu (Simon) Chen xche...@gmail.com wrote: I have a working set larger than available memory, thus I am hoping to turn on rdd compression so that I can store more in-memory. Strangely it made no difference. The number of cached partitions, fraction cached, and size in memory remain the same. Any ideas? I confirmed that rdd compression wasn't on before and it was on for the second test. scala sc.getConf.getAll foreach println ... (spark.rdd.compress,true) ... I haven't tried lzo vs snappy, but my guess is that either one should provide at least some benefit.. Thanks. -Simon
Re: Loading Python libraries into Spark
Hi Andrei, Thank you for your help! Just to make sure I understand, when I run this command sc.addPyFile(/path/to/yourmodule.py), I need to be already logged into the master node and have my python files somewhere, is that correct? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-Python-libraries-into-Spark-tp7059p7073.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: reuse hadoop code in Spark
Use RDD.mapPartitions to go over all the items in a partition with one Mapper object. It will look something like this: rdd.mapPartitions(iterator = val mapper = new myown.Mapper() mapper.configure(conf) val output = // {{create an OutputCollector that stores stuff in an ArrayBuffer}} for ((key, value) - iterator) { mapper.map(key, value, output, Reporter.NULL) } output } On Jun 5, 2014, at 8:12 AM, Wei Tan w...@us.ibm.com wrote: Thanks Matei. Using your pointers I can import data frrom HDFS, what I want to do now is something like this in Spark: --- import myown.mapper rdd.map (mapper.map) --- The reason why I want this: myown.mapper is a java class I already developed. I used to run it in Hadoop. It is fairly complex and relies on a lot of utility java classes I wrote. Can I reuse the map function in java and port it into Spark? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From:Matei Zaharia matei.zaha...@gmail.com To:user@spark.apache.org, Date:06/04/2014 04:28 PM Subject:Re: reuse hadoop code in Spark Yes, you can write some glue in Spark to call these. Some functions to look at: - SparkContext.hadoopRDD lets you create an input RDD from an existing JobConf configured by Hadoop (including InputFormat, paths, etc) - RDD.mapPartitions lets you operate in all the values on one partition (block) at a time, similar to how Mappers in MapReduce work - PairRDDFunctions.reduceByKey and groupByKey can be used for aggregation. - RDD.pipe() can be used to call out to a script or binary, like Hadoop Streaming. A fair number of people have been running both Java and Hadoop Streaming apps like this. Matei On Jun 4, 2014, at 1:08 PM, Wei Tan w...@us.ibm.com wrote: Hello, I am trying to use spark in such a scenario: I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I wonder if I can reuse the map() functions I already wrote in Hadoop (Java), and use Spark to chain them, mixing the Java map() functions with Spark operators? Another related question, can I use binary as operators, like Hadoop streaming? Thanks! Wei
Re: Loading Python libraries into Spark
In my answer I assumed you run your program with pyspark command (e.g. pyspark mymainscript.py, pyspark should be on your path). In this case workflow is as follows: 1. You create SparkConf object that simply contains your app's options. 2. You create SparkContext, which initializes your application. At this point application connects to master and asks for resources. 3. You modify SparkContext object to include everything you want to make available for mappers on other hosts, e.g. other *.py files. 4. You create RDD (e.g. with sc.textFile) and run actual commands (e.g. map, filter, etc.). SparkContext knows about your additional files, so these commands are aware of your library code. So, yes, in these settings you need to create sc (SparkContext object) beforehand and make *.py files available on application's host. With pyspark shell you already do have sc object initialized for you (try running pyspark and typing sc + Enter - shell will print spark context details). You can also use spark-submit [1], which will initialize SparkContext from command line options. But essentially idea is always the same: there's driver application running on one host that creates SparkContext, collects dependencies, controls program flow, etc., and there are workers - applications on slave hosts, that use created SparkContext and all serialized data to perform driver's commands. Driver should know about everything and let workers know about what they need to know (e.g. your library code). [1]: http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jun 5, 2014 at 8:10 PM, mrm ma...@skimlinks.com wrote: Hi Andrei, Thank you for your help! Just to make sure I understand, when I run this command sc.addPyFile(/path/to/yourmodule.py), I need to be already logged into the master node and have my python files somewhere, is that correct? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-Python-libraries-into-Spark-tp7059p7073.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
creating new ami image for spark ec2 commands
How would I go about creating a new AMI image that I can use with the spark ec2 commands? I can't seem to find any documentation. I'm looking for a list of steps that I'd need to perform to make an Amazon Linux image ready to be used by the spark ec2 tools. I've been reading through the spark 1.0.0 documentation, looking at the script itself (spark_ec2.py), and looking at the github project mesos/spark-ec2. From what I can tell, the spark_ec2.py script looks up the id of the AMI based on the region and machine type (hvm or pvm) using static content derived from the github repo mesos/spark-ec2. The spark ec2 script loads the AMI id from this base url: https://raw.github.com/mesos/spark-ec2/v2/ami-list (Which presumably comes from https://github.com/mesos/spark-ec2 ) For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id: ami-5bb18832 Is there a list of instructions for how this AMI was created? Assuming I'm starting with my own Amazon Linux image, what would I need to do to make it usable where I could pass that AMI id to spark_ec2.py rather than using the default spark-provided AMI? Thanks, Matt
Examples
Hi, I’m still having trouble running the CassandraTest example from the Spark-1.0.0 binary package. I’ve made a Stackoverflow question for it so you can get some street cred for helping me :) http://stackoverflow.com/q/24069039/503826 Thanks! Tim Kellogg Sr. Software Engineer, Protocols 2lemetry 605-593-7099 @kellogh
Setting executor memory when using spark-shell
Hi All, Please help me set Executor JVM memory size. I am using Spark shell and it appears that the executors are started with a predefined JVM heap of 512m as soon as Spark shell starts. How can I change this setting? I tried setting SPARK_EXECUTOR_MEMORY before launching Spark shell: export SPARK_EXECUTOR_MEMORY=1g I also tried several other approaches: 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker 2) passing it as -m argument and running bin/start-slave.sh 1 -m 1g on the worker Thank you, Oleg
Re: Setting executor memory when using spark-shell
Hi Oleg, I set the size of my executors on a standalone cluster when using the shell like this: ./bin/spark-shell --master $MASTER --total-executor-cores $CORES_ACROSS_CLUSTER --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR It doesn't seem particularly clean, but it works. Andrew On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Hi All, Please help me set Executor JVM memory size. I am using Spark shell and it appears that the executors are started with a predefined JVM heap of 512m as soon as Spark shell starts. How can I change this setting? I tried setting SPARK_EXECUTOR_MEMORY before launching Spark shell: export SPARK_EXECUTOR_MEMORY=1g I also tried several other approaches: 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker 2) passing it as -m argument and running bin/start-slave.sh 1 -m 1g on the worker Thank you, Oleg
Spark Streaming, download a s3 file to run a script shell on it
Hi, I've got a weird question but maybe someone has already dealt with it. My Spark Streaming application needs to - download a file from a S3 bucket, - run a script with the file as input, - create a DStream from this script output. I've already got the second part done with the rdd.pipe() API that really fits my request, but I have no idea how to manage the first part. How can I manage to download a file and run a script on them inside a Spark Streaming Application? Should I use process() from Scala or it won't work? Thanks Gianluca
Re: Setting executor memory when using spark-shell
Thank you, Andrew, I am using Spark 0.9.1 and tried your approach like this: bin/spark-shell --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR I get bad option: '--driver-java-options' There must be something different in my setup. Any ideas? Thank you again, Oleg On 5 June 2014 22:28, Andrew Ash and...@andrewash.com wrote: Hi Oleg, I set the size of my executors on a standalone cluster when using the shell like this: ./bin/spark-shell --master $MASTER --total-executor-cores $CORES_ACROSS_CLUSTER --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR It doesn't seem particularly clean, but it works. Andrew On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Hi All, Please help me set Executor JVM memory size. I am using Spark shell and it appears that the executors are started with a predefined JVM heap of 512m as soon as Spark shell starts. How can I change this setting? I tried setting SPARK_EXECUTOR_MEMORY before launching Spark shell: export SPARK_EXECUTOR_MEMORY=1g I also tried several other approaches: 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker 2) passing it as -m argument and running bin/start-slave.sh 1 -m 1g on the worker Thank you, Oleg -- Kind regards, Oleg
Re: implicit ALS dataSet
Thank you for your quick reply. As far as I know, the update does not require negative observations, because the update rule Xu = (YtCuY + λI)^-1 Yt Cu P(u) can be simplified by taking advantage of its algebraic structure, so negative observations are not needed. This is what I think at the first time I read the paper. What makes me confused is, after that, the paper (in Discussion section) says Unlike explicit datasets, here *the model should take all user-item preferences as an input, including those which are not related to any input observation (thus hinting to a zero preference).* This is crucial, as the given observations are inherently biased towards a positive preference, and thus do not reflect well the user profile. However, taking all user-item values as an input to the model raises serious scalability issues – the number of all those pairs tends to significantly exceed the input size since a typical user would provide feedback only on a small fraction of the available items. We address this by exploiting the algebraic structure of the model, leading to an algorithm that scales linearly with the input size *while addressing the full scope of user-item pairs* without resorting to any sub-sampling. If my understanding is right, it seems that we need negative obs as input, but we dont use them during the updating. It is strange for me, because that will generate too many use-time pair, which is not possible. Thx for the confirmation. I will read the ALS implementation for more details. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/implicit-ALS-dataSet-tp7067p7086.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: implicit ALS dataSet
On Thu, Jun 5, 2014 at 10:38 PM, redocpot julien19890...@gmail.com wrote: can be simplified by taking advantage of its algebraic structure, so negative observations are not needed. This is what I think at the first time I read the paper. Correct, a big part of the reason that is efficient is because of sparsity of the input. What makes me confused is, after that, the paper (in Discussion section) says Unlike explicit datasets, here *the model should take all user-item preferences as an input, including those which are not related to any input It is not saying that these non-observations (I would not call them negative) should explicitly appear in the input. But their implicit existence can and should be used in the math. In particular, the loss function that is being minimized is minimizing error in the implicit 0 cells of the input too, just with much less weight.
Re: spark worker and yarn memory
Hi Xu, As crazy as it might sound, this all makes sense. There are a few different quantities at play here: * the heap size of the executor (controlled by --executor-memory) * the amount of memory spark requests from yarn (the heap size plus 384 mb to account for fixed memory costs outside if the heap) * the amount of memory yarn grants to the container (yarn rounds up to the nearest multiple of yarn.scheduler.minimum-allocation-mb or yarn.scheduler.fair.increment-allocation-mb, depending on the scheduler used) * the amount of memory spark uses for caching on each executor, which is spark.storage.memoryFraction (default 0.6) of the executor heap size So, with --executor-memory 8g, spark requests 8g + 384m from yarn, which doesn't fit into it's container max. With --executor-memory 7g, Spark requests 7g + 384m from yarn, which fits into its container max. This gets rounded up to 8g by the yarn scheduler. 7g is still used as the executor heap size, and .6 of this is about 4g, shown as the cache space in the spark. -Sandy On Jun 5, 2014, at 9:44 AM, Xu (Simon) Chen xche...@gmail.com wrote: I am slightly confused about the --executor-memory setting. My yarn cluster has a maximum container memory of 8192MB. When I specify --executor-memory 8G in my spark-shell, no container can be started at all. It only works when I lower the executor memory to 7G. But then, on yarn, I see 2 container per node, using 16G of memory. Then on the spark UI, it shows that each worker has 4GB of memory, rather than 7. Can someone explain the relationship among the numbers I see here? Thanks.
Re: Join : Giving incorrect result
Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in the way join tasks spill to disk (which happened when you had more concurrent tasks competing for memory). I’ve posted a patch for it here: https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; it will also be in 0.9.2 and 1.0.1. Matei On Jun 5, 2014, at 12:19 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Sorry for replying late. It was night here. Lian/Matei, Here is the code snippet - sparkConf.set(spark.executor.memory, 10g) sparkConf.set(spark.cores.max, 5) val sc = new SparkContext(sparkConf) val accId2LocRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_, 0, ',', true)) val accId2DemoRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_, 0, ',', true)) val joinedRDD = accId2LocRDD.join(accId2DemoRDD) def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = { val splits = line.split(delimit) if (splits.length = 1) { (null, null) } else if (retFullLine) { (splits(keyIndex), line) } else{ (splits(keyIndex), splits(splits.length-keyIndex-1)) } } Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records. We have done some more experiments - 1) Running cogroup instead of join - it also gives incorrect count. 2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count. 3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations. I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine. Regards, Ajay On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com wrote: If this isn’t the problem, it would be great if you can post the code for the program. Matei On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote: Maybe your two workers have different assembly jar files? I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results. On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?
I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume that this means fully cached) to NODE_LOCAL or even RACK_LOCAL. When these happen things get extremely slow. Does this mean that the executor got terminated and restarted? Is there a way to prevent this from happening (barring the machine actually going down, I'd rather stick with the same process)?
Re: Setting executor memory when using spark-shell
Oh my apologies that was for 1.0 For Spark 0.9 I did it like this: MASTER=spark://mymaster:7077 SPARK_MEM=8g ./bin/spark-shell -c $CORES_ACROSS_CLUSTER The downside of this though is that SPARK_MEM also sets the driver's JVM to be 8g, rather than just the executors. I think this is the reason for why SPARK_MEM was deprecated. See https://github.com/apache/spark/pull/99 On Thu, Jun 5, 2014 at 2:37 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Thank you, Andrew, I am using Spark 0.9.1 and tried your approach like this: bin/spark-shell --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR I get bad option: '--driver-java-options' There must be something different in my setup. Any ideas? Thank you again, Oleg On 5 June 2014 22:28, Andrew Ash and...@andrewash.com wrote: Hi Oleg, I set the size of my executors on a standalone cluster when using the shell like this: ./bin/spark-shell --master $MASTER --total-executor-cores $CORES_ACROSS_CLUSTER --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR It doesn't seem particularly clean, but it works. Andrew On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Hi All, Please help me set Executor JVM memory size. I am using Spark shell and it appears that the executors are started with a predefined JVM heap of 512m as soon as Spark shell starts. How can I change this setting? I tried setting SPARK_EXECUTOR_MEMORY before launching Spark shell: export SPARK_EXECUTOR_MEMORY=1g I also tried several other approaches: 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker 2) passing it as -m argument and running bin/start-slave.sh 1 -m 1g on the worker Thank you, Oleg -- Kind regards, Oleg
Re: Join : Giving incorrect result
Hi Ajay, Can you please try running the same code with spark.shuffle.spill=false and see if the numbers turn out correctly? That parameter controls whether or not the buggy code that Matei fixed in ExternalAppendOnlyMap is used. FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some fixes in spilling landed. Andrew On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in the way join tasks spill to disk (which happened when you had more concurrent tasks competing for memory). I’ve posted a patch for it here: https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; it will also be in 0.9.2 and 1.0.1. Matei On Jun 5, 2014, at 12:19 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Sorry for replying late. It was night here. Lian/Matei, Here is the code snippet - sparkConf.set(spark.executor.memory, 10g) sparkConf.set(spark.cores.max, 5) val sc = new SparkContext(sparkConf) val accId2LocRDD = sc.textFile( hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_, 0, ',', true)) val accId2DemoRDD = sc.textFile( hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_, 0, ',', true)) val joinedRDD = accId2LocRDD.join(accId2DemoRDD) def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = { val splits = line.split(delimit) if (splits.length = 1) { (null, null) } else if (retFullLine) { (splits(keyIndex), line) } else{ (splits(keyIndex), splits(splits.length-keyIndex-1)) } } Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records. We have done some more experiments - 1) Running cogroup instead of join - it also gives incorrect count. 2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count. 3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations. I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine. Regards, Ajay On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com wrote: If this isn’t the problem, it would be great if you can post the code for the program. Matei On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote: Maybe your two workers have different assembly jar files? I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results. On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?
On a related note, I'd also minimize any kind of executor movement. I.e., once an executor is spawned and data cached in the executor, I want that executor to live all the way till the job is finished, or the machine fails in a fatal manner. What would be the best way to ensure that this is the case? On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume that this means fully cached) to NODE_LOCAL or even RACK_LOCAL. When these happen things get extremely slow. Does this mean that the executor got terminated and restarted? Is there a way to prevent this from happening (barring the machine actually going down, I'd rather stick with the same process)?
Re: Using Spark on Data size larger than Memory size
Hi Aaron, When you say that sorting is being worked on, can you elaborate a little more please? If particular, I want to sort the items within each partition (not globally) without necessarily bringing them all into memory at once. Thanks, Roger On Sat, May 31, 2014 at 11:10 PM, Aaron Davidson ilike...@gmail.com wrote: There is no fundamental issue if you're running on data that is larger than cluster memory size. Many operations can stream data through, and thus memory usage is independent of input data size. Certain operations require an entire *partition* (not dataset) to fit in memory, but there are not many instances of this left (sorting comes to mind, and this is being worked on). In general, one problem with Spark today is that you *can* OOM under certain configurations, and it's possible you'll need to change from the default configuration if you're using doing very memory-intensive jobs. However, there are very few cases where Spark would simply fail as a matter of course *-- *for instance, you can always increase the number of partitions to decrease the size of any given one. or repartition data to eliminate skew. Regarding impact on performance, as Mayur said, there may absolutely be an impact depending on your jobs. If you're doing a join on a very large amount of data with few partitions, then we'll have to spill to disk. If you can't cache your working set of data in memory, you will also see a performance degradation. Spark enables the use of memory to make things fast, but if you just don't have enough memory, it won't be terribly fast. On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: Clearly thr will be impact on performance but frankly depends on what you are trying to achieve with the dataset. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com wrote: Some inputs will be really helpful. Thanks, -Vibhor On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi all, I am planning to use spark with HBase, where I generate RDD by reading data from HBase Table. I want to know that in the case when the size of HBase Table grows larger than the size of RAM available in the cluster, will the application fail, or will there be an impact in performance ? Any thoughts in this direction will be helpful and are welcome. Thanks, -Vibhor -- Vibhor Banga Software Development Engineer Flipkart Internet Pvt. Ltd., Bangalore
Re: Using Spark on Data size larger than Memory size
I think it would very handy to be able to specify that you want sorting during a partitioning stage. On Thu, Jun 5, 2014 at 4:42 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi Aaron, When you say that sorting is being worked on, can you elaborate a little more please? If particular, I want to sort the items within each partition (not globally) without necessarily bringing them all into memory at once. Thanks, Roger On Sat, May 31, 2014 at 11:10 PM, Aaron Davidson ilike...@gmail.com wrote: There is no fundamental issue if you're running on data that is larger than cluster memory size. Many operations can stream data through, and thus memory usage is independent of input data size. Certain operations require an entire *partition* (not dataset) to fit in memory, but there are not many instances of this left (sorting comes to mind, and this is being worked on). In general, one problem with Spark today is that you *can* OOM under certain configurations, and it's possible you'll need to change from the default configuration if you're using doing very memory-intensive jobs. However, there are very few cases where Spark would simply fail as a matter of course *-- *for instance, you can always increase the number of partitions to decrease the size of any given one. or repartition data to eliminate skew. Regarding impact on performance, as Mayur said, there may absolutely be an impact depending on your jobs. If you're doing a join on a very large amount of data with few partitions, then we'll have to spill to disk. If you can't cache your working set of data in memory, you will also see a performance degradation. Spark enables the use of memory to make things fast, but if you just don't have enough memory, it won't be terribly fast. On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: Clearly thr will be impact on performance but frankly depends on what you are trying to achieve with the dataset. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com wrote: Some inputs will be really helpful. Thanks, -Vibhor On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi all, I am planning to use spark with HBase, where I generate RDD by reading data from HBase Table. I want to know that in the case when the size of HBase Table grows larger than the size of RAM available in the cluster, will the application fail, or will there be an impact in performance ? Any thoughts in this direction will be helpful and are welcome. Thanks, -Vibhor -- Vibhor Banga Software Development Engineer Flipkart Internet Pvt. Ltd., Bangalore
Re: Using Spark on Data size larger than Memory size
Hi Roger, You should be able to sort within partitions using the rdd.mapPartitions() method, and that shouldn't require holding all data in memory at once. It does require holding the entire partition in memory though. Do you need the partition to never be held in memory all at once? As far as the work that Aaron mentioned is happening, I think he might be referring to the discussion and code surrounding https://issues.apache.org/jira/browse/SPARK-983 Cheers! Andrew On Thu, Jun 5, 2014 at 5:16 PM, Roger Hoover roger.hoo...@gmail.com wrote: I think it would very handy to be able to specify that you want sorting during a partitioning stage. On Thu, Jun 5, 2014 at 4:42 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi Aaron, When you say that sorting is being worked on, can you elaborate a little more please? If particular, I want to sort the items within each partition (not globally) without necessarily bringing them all into memory at once. Thanks, Roger On Sat, May 31, 2014 at 11:10 PM, Aaron Davidson ilike...@gmail.com wrote: There is no fundamental issue if you're running on data that is larger than cluster memory size. Many operations can stream data through, and thus memory usage is independent of input data size. Certain operations require an entire *partition* (not dataset) to fit in memory, but there are not many instances of this left (sorting comes to mind, and this is being worked on). In general, one problem with Spark today is that you *can* OOM under certain configurations, and it's possible you'll need to change from the default configuration if you're using doing very memory-intensive jobs. However, there are very few cases where Spark would simply fail as a matter of course *-- *for instance, you can always increase the number of partitions to decrease the size of any given one. or repartition data to eliminate skew. Regarding impact on performance, as Mayur said, there may absolutely be an impact depending on your jobs. If you're doing a join on a very large amount of data with few partitions, then we'll have to spill to disk. If you can't cache your working set of data in memory, you will also see a performance degradation. Spark enables the use of memory to make things fast, but if you just don't have enough memory, it won't be terribly fast. On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: Clearly thr will be impact on performance but frankly depends on what you are trying to achieve with the dataset. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com wrote: Some inputs will be really helpful. Thanks, -Vibhor On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi all, I am planning to use spark with HBase, where I generate RDD by reading data from HBase Table. I want to know that in the case when the size of HBase Table grows larger than the size of RAM available in the cluster, will the application fail, or will there be an impact in performance ? Any thoughts in this direction will be helpful and are welcome. Thanks, -Vibhor -- Vibhor Banga Software Development Engineer Flipkart Internet Pvt. Ltd., Bangalore
Re: How to shut down Spark Streaming with Kafka properly?
Sean, your patch fixes the issue, thank you so much! (This is the second time within one week I run into network libraries not shutting down threads properly, I'm really glad your code fixes the issue.) I saw your pull request is closed, but not merged yet. Can I do anything to get your fix into Spark? Open an issue, send a pull request myself etc.? Thanks Tobias
Re: Setting executor memory when using spark-shell
just use -Dspark.executor.memory= -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Twitter feed options?
Me again, Things have been going well, actually. I've got my build chain sorted, 1.0.0 and streaming is working reliably. I managed to turn off the INFO messages by messing with every log4j properties file on the system. :-) On thing I would like to try now is some natural language processing on some selected twitter streams. (ie: my own.) but the streaming example seems to be 'sipping from the firehose'. I'm combing through the twitter4j documentation now, but does anyone know a simple way of restricting the 'flood' to just my own timeline? Otherwise, yes, this is now the fun part! -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
RE: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?
If some task have no locality preference, it will also show up as PROCESS_LOCAL, yet, I think we probably need to name it NO_PREFER to make it more clear. Not sure is this your case. Best Regards, Raymond Liu From: coded...@gmail.com [mailto:coded...@gmail.com] On Behalf Of Sung Hwan Chung Sent: Friday, June 06, 2014 6:53 AM To: user@spark.apache.org Subject: Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL? Additionally, I've encountered some confusing situation where the locality level for a task showed up as 'PROCESS_LOCAL' even though I didn't cache the data. I wonder some implicit caching happens even without the user specifying things. On Thu, Jun 5, 2014 at 3:50 PM, Sung Hwan Chung coded...@cs.stanford.edumailto:coded...@cs.stanford.edu wrote: Thanks Andrew, Is there a chance that even with full-caching, that modes other than PROCESS_LOCAL will be used? E.g., let's say, an executor will try to perform tasks although the data are cached on a different executor. What I'd like to do is to prevent such a scenario entirely. I'd like to know if setting 'spark.locality.wait' to a very high value would guarantee that the mode will always be 'PROCESS_LOCAL'. On Thu, Jun 5, 2014 at 3:36 PM, Andrew Ash and...@andrewash.commailto:and...@andrewash.com wrote: The locality is how close the data is to the code that's processing it. PROCESS_LOCAL means data is in the same JVM as the code that's running, so it's really fast. NODE_LOCAL might mean that the data is in HDFS on the same node, or in another executor on the same node, so is a little slower because the data has to travel across an IPC connection. RACK_LOCAL is even slower -- data is on a different server so needs to be sent over the network. Spark switches to lower locality levels when there's no unprocessed data on a node that has idle CPUs. In that situation you have two options: wait until the busy CPUs free up so you can start another task that uses data on that server, or start a new task on a farther away server that needs to bring data from that remote place. What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The main tunable option is how far long the scheduler waits before starting to move data rather than code. Those are the spark.locality.* settings here: http://spark.apache.org/docs/latest/configuration.html If you want to prevent this from happening entirely, you can set the values to ridiculously high numbers. The documentation also mentions that 0 has special meaning, so you can try that as well. Good luck! Andrew On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung coded...@cs.stanford.edumailto:coded...@cs.stanford.edu wrote: I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume that this means fully cached) to NODE_LOCAL or even RACK_LOCAL. When these happen things get extremely slow. Does this mean that the executor got terminated and restarted? Is there a way to prevent this from happening (barring the machine actually going down, I'd rather stick with the same process)?
Re: spark worker and yarn memory
Nice explanation... Thanks! On Thu, Jun 5, 2014 at 5:50 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Xu, As crazy as it might sound, this all makes sense. There are a few different quantities at play here: * the heap size of the executor (controlled by --executor-memory) * the amount of memory spark requests from yarn (the heap size plus 384 mb to account for fixed memory costs outside if the heap) * the amount of memory yarn grants to the container (yarn rounds up to the nearest multiple of yarn.scheduler.minimum-allocation-mb or yarn.scheduler.fair.increment-allocation-mb, depending on the scheduler used) * the amount of memory spark uses for caching on each executor, which is spark.storage.memoryFraction (default 0.6) of the executor heap size So, with --executor-memory 8g, spark requests 8g + 384m from yarn, which doesn't fit into it's container max. With --executor-memory 7g, Spark requests 7g + 384m from yarn, which fits into its container max. This gets rounded up to 8g by the yarn scheduler. 7g is still used as the executor heap size, and .6 of this is about 4g, shown as the cache space in the spark. -Sandy On Jun 5, 2014, at 9:44 AM, Xu (Simon) Chen xche...@gmail.com wrote: I am slightly confused about the --executor-memory setting. My yarn cluster has a maximum container memory of 8192MB. When I specify --executor-memory 8G in my spark-shell, no container can be started at all. It only works when I lower the executor memory to 7G. But then, on yarn, I see 2 container per node, using 16G of memory. Then on the spark UI, it shows that each worker has 4GB of memory, rather than 7. Can someone explain the relationship among the numbers I see here? Thanks.
Re: Twitter feed options?
Nope, sorry, nevermind! I looked at the source, and it was pretty obvious that it didn't implement that yet, so I've ripped the classes out and am mutating them into a new receivers right now... ... starting to get the hang of this. On Fri, Jun 6, 2014 at 1:07 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Me again, Things have been going well, actually. I've got my build chain sorted, 1.0.0 and streaming is working reliably. I managed to turn off the INFO messages by messing with every log4j properties file on the system. :-) On thing I would like to try now is some natural language processing on some selected twitter streams. (ie: my own.) but the streaming example seems to be 'sipping from the firehose'. I'm combing through the twitter4j documentation now, but does anyone know a simple way of restricting the 'flood' to just my own timeline? Otherwise, yes, this is now the fun part! -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
KyroException: Unable to find class
Hello, I have been using Externalizer from Chill to as serialization wrapper. It appears to me that Spark have some conflict with the classloader with Chill. I have the (a simplified version) following program: import java.io._ import com.twitter.chill.Externalizer class X(val i: Int) { override def toString() = sX(i=$i) } object SimpleApp { def main(args: Array[String]) { val bos = new ByteArrayOutputStream(10) val oos = new ObjectOutputStream(bos) oos.writeObject(Externalizer(new X(10))) oos.close() val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) val y = ois.readObject.asInstanceOf[Externalizer[X]] println(y.get) } } When I run it as a normal program (i.e. sbt run), the program runs fine. But when I run it with spark-submit (i.e. spark-submit --verbose --class SimpleApp --master local[4] target/scala-2.10/simple-project_2.10-1.0.jar ), the program fails at ois.readObject call. I got an error that Kryo fails to find the class X. Exception in thread main com.esotericsoftware.kryo.KryoException: Unable to find class: X at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) I guess the issue is that Spark has a magic classloader, and Kryo fails to see the same classpaths. Is there anyway to remedy this issue? Thanks. Justin
Spark Streaming NeteorkReceiver problems
hi, here is problem description, I write a custom networkreceiver to receive image data from camera. I had confirmed all the data received correctly. 1)when data received, only the networkreceiver node run at full speed, while other nodes keep idle, my spark cluster has 6 nodes. 2)And every image data is calculated many times, which I expected to calculate once. 3)How to distribute tasks to the whole cluster? I tried dstream.repartitioin.map(), problems in 2) is still existed ,and sometimes job faidled. here is spark configration, conf.set(spark.executor.memory, 3g); conf.set(spark.shuffle.netty.connect.timeout, 30) conf.set(spark.storage.blockManagerSlaveTimeoutMs, 30) conf.set(spark.akka.timeout,600) conf.set(spark.akka.threads,8) node : 8 cores, mem 6G. NetWork : 1Gb/bps Any suggestions will be appreciated. thanks, QingFeng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NeteorkReceiver-problems-tp7109.html Sent from the Apache Spark User List mailing list archive at Nabble.com.