Re: Why collect() has a stage but first() not?
first() is allowed to run locally, which means that the driver will execute the action itself without launching any tasks. This is also true of take(n) for sufficiently small n, for instance. On Wed, Feb 19, 2014 at 9:55 AM, David Thomas dt5434...@gmail.com wrote: If I perform a 'collect' action on the RDD, I can see a new stage getting created in the spark web UI (http://master:4040/stages/), but when I do a 'first' action, I don't see any stage getting created. However on the console I see these lines: 14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110 14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at xxx.scala:110) with 1 output partitions (allowLocal=true) 14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at xxx.scala:110) 14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List() 14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List() So why doesn't the webUI list the stages created when I run the 'first' action?
Re: Why collect() has a stage but first() not?
The driver does query for the first partition of the RDD using the BlockManager. If the RDD is cached, the worker process that has the first partition in memory will ship it back to the driver, and the driver will iterate over it. On Wed, Feb 19, 2014 at 9:59 AM, David Thomas dt5434...@gmail.com wrote: But my RDD is placed on the worker nodes. So how can driver perform the action by itself? On Wed, Feb 19, 2014 at 10:57 AM, Aaron Davidson ilike...@gmail.comwrote: first() is allowed to run locally, which means that the driver will execute the action itself without launching any tasks. This is also true of take(n) for sufficiently small n, for instance. On Wed, Feb 19, 2014 at 9:55 AM, David Thomas dt5434...@gmail.comwrote: If I perform a 'collect' action on the RDD, I can see a new stage getting created in the spark web UI (http://master:4040/stages/), but when I do a 'first' action, I don't see any stage getting created. However on the console I see these lines: 14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110 14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at xxx.scala:110) with 1 output partitions (allowLocal=true) 14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at xxx.scala:110) 14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List() 14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List() So why doesn't the webUI list the stages created when I run the 'first' action?
Re: Using local[N] gets Too many open files?
If you are intentionally opening many files at once and getting that error, then it is a fixable OS issue. Please check out this discussion regarding changing the file limit in /etc/limits.conf: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-td1464.html If you feel that your job should not be opening so many files at a time, then please give a little more detail about the nature of your job. A few questions bear answering: Are you using a standard Spark input method, such as sc.textFile()? This should only have one open file per partition per core (so 8 concurrently in your case). Are you performing any sort of join or shuffle operation? This can create intermediate shuffle or external sorting files. Shuffling an RDD into N partitions will cause us to open N files at a time (per core), so that could be up to 800k in your case. You can reduce this by shuffling your 100k input partitions into many fewer output partitions, assuming that each file is actually small. This can be set as a parameter to any shuffle-inducing operation. If your job is using external sorting to avoid OOMing (which it will warn you about in the executor logs with messages like Spilling in-memory map...), then you may have arbitrarily many files open. This is very unlikely to happen if you've split your input into as many files as you said, though. On Sun, Feb 16, 2014 at 6:18 PM, Matthew Cheah matthew.c.ch...@gmail.comwrote: Hi everyone, I'm experimenting with Spark in both a distributed environment and as a multi-threaded local application. When I set the spark master to local[8] and attempt to read a ~20GB text file on the local file system into an RDD and perform computations on it, I don't get an out of memory error, but rather a Too many open files error. Is there a reason why this happens? How aggressively is Spark partitioning the data into intermediate files? I have also tried splitting the text file into numerous text files - around 100,000 of them - and processing 10,000 of them at a time sequentially. However then Spark seems to get bottlenecked on reading each individual file into the RDD before proceeding with the computation. This has issues even reading 10,000 files at once. I would have thought that Spark could do I/O in parallel with computation, but it seems that Spark does all of the I/O first? I was wondering if Spark was simply just not built for local applications outside of testing. Thanks, -Matt Cheah
Re: [External] Re: Too many open files
Note that you may just be changing the soft limit, and are still being capped by the hard (system-wide) limit. Changing the /etc/limit.conf file as specified above allows you to modify both the soft and hard limits, and requires a restart of the machine to take effect. On Thu, Feb 13, 2014 at 11:05 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: Did you change your system wide limit? Mayur Rustagi Ph: +919632149971 h https://twitter.com/mayur_rustagittp://www.sigmoidanalytics.com https://twitter.com/mayur_rustagi On Thu, Feb 13, 2014 at 10:48 AM, Korb, Michael [USA] korb_mich...@bah.com wrote: My ulimit is already unlimited. Wouldn't setting it to 18000 be decreasing it? Could it be an issue other than ulimit? Because I can't increase it beyond unlimited. Thanks, Mike From: Mayur Rustagi mayur.rust...@gmail.com Reply-To: user@spark.incubator.apache.org user@spark.incubator.apache.org Date: Thursday, February 13, 2014 12:58 PM To: user@spark.incubator.apache.org user@spark.incubator.apache.org Subject: Re: [External] Re: Too many open files Easiest is ulimit -n 18000 to your conf/spark-env.sh Restart the cluster.. make sure you change the limits at OS level as well. Regards Mayur Mayur Rustagi Ph: +919632149971 h https://twitter.com/mayur_rustagittp://www.sigmoidanalytics.com https://twitter.com/mayur_rustagi On Thu, Feb 13, 2014 at 9:51 AM, Korb, Michael [USA] korb_mich...@bah.com wrote: No I don't. I ran all Spark processes as a user with ulimit = unlimited. From: Mayur Rustagi mayur.rust...@gmail.com Reply-To: user@spark.incubator.apache.org user@spark.incubator.apache.org Date: Thursday, February 13, 2014 12:34 PM To: user@spark.incubator.apache.org user@spark.incubator.apache.org Subject: [External] Re: Too many open files The limit could be on any of the machines(including the master). Do you have ganglia setup? Mayur Rustagi Ph: +919632149971 h https://twitter.com/mayur_rustagittp://www.sigmoidanalytics.com https://twitter.com/mayur_rustagi On Thu, Feb 13, 2014 at 7:13 AM, Korb, Michael [USA] korb_mich...@bah.com wrote: Hi, When I submit a job to a cluster, I get a large string of errors like this: WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /tmp/spark-local* (Too many open files) All the answers I can find say to increase ulimit, but I have it set to unlimited (as the user running the spark daemons as well as the user submitting the job) and am still getting the error. I'm attempting to create an RDD like this: sc.textFile(/path/to/files/*).persist(StorageLevel.MEMORY_ONLY_SER()), and run a series of maps and filters on the data. There are about 2k files for a total of about 230g of data, and my current cluster is 3 nodes, 32 cores each, with spark.executor.memory set to 32g. I've tried different StorageLevel settings but still have the same error. Interestingly, the job works if I write and submit with pyspark, but I want to get it working in Java. Thanks, Mike
Re: Shuffle file not found Exception
This sounds bad, and probably related to shuffle file consolidation. Turning off consolidation would probably get you working again, but I'd really love to track down the bug. Do you know if any tasks fail before those errors start occurring? It's very possible that another exception is occurring which is causing a file to not be written -- I think I've seen latent OOMEs induce this behavior, for instance. On Sun, Feb 9, 2014 at 8:28 AM, Guillaume Pitel guillaume.pi...@exensa.comwrote: Hi, I've got a strange problem with 0.8.1 (we're going to make the jump to 0.9.0 in a few days, but for now I'm woring with a 0.8.1 cluster) : After a few iteration of my method, one random node of my local cluster throws an exception like that : FileNotFoundException: /sparktmp/spark-local-20140209073949-29b1/37/merged_shuffle_24_23_1 (No such file or directory) Then, either the job get stuck for hours, or it fails right away. I've got the ulimit at 131k files, and consolidateFiles=true, so I don't think it a problem related to the # of file descriptors Guillaume -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 exensa_logo_mail.png
Re: Hash Join in Spark
This method is doing very little. Line 2 constructs the CoGroupedRDD, which will do all the real work. Note that while this cogroup function just groups 2 RDDs together, CoGroupedRDD allows general n-way cogrouping, so it takes a Seq[RDD(K, _)] rather than just 2 such key-value RDDs. The rest of the code in this method is simply converting the result of CoGroupedRDD back from its generalized form into an RDD[(K, Seq[V], Seq[W])]. (CoGroupedRDD returns an RDD[(K, Seq[Seq[_]])] as there are n of those Seq[_]s, one per grouping RDD.) To go over some of the finer points of these remaining lines; 3. This line is actually not necessary, and is simply confusing. I have submitted a small patch https://github.com/apache/incubator-spark/pull/530to remove it.* 4. mapValues will iterate through the results of the CoGroupedRDD (i.e., the already-cogrouped values) in order to change the type of the return value from the generic Seq[Seq[_]] to a (Seq[V], Seq[W]), since we know each Seq has exactly 2 elements. The remainder of this line simply does the casting from Seq[_] to Seq[V] or Seq[W] as appropriate. * Here's a real explanation for line 3, in case you're curious about the Scala magic that's going on. Normally, all RDDs that look like key-value pairs (having a generic type of Tuple2, like [K, V]) are implicitly convertedhttp://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.html to PairRDDFunctions, to provide extra functions that can operate over these types of RDDs. For reasons slightly unclear, the author of this code chose to forgo using the implicit conversion in favor of explicitly converting the CoGroupedRDD into a PairRDDFunctions in order to gain access to the mapValues method. On Sun, Feb 2, 2014 at 8:47 PM, rose kunj rosek...@yahoo.com wrote: Since, my earlier question is still unanswered, I have decided to dig into the spark code myself. However, I am new to spark as well as scala in particular. Can some one help me understand the following code snippet: 1. def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { 2.val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) 3.val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag) 4. prfs.mapValues { case Seq(vs, ws) = (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) 5.} 6. } Thanks, rose On Friday, January 24, 2014 4:32 PM, rose rosek...@yahoo.com wrote: Hi all, I want to know more about join operation in spark. I know it uses hash join, but I am not able to figure out the nature of the implementation such as blocking, non blocking, or shared , not shared partitions. If anyone knows, please reply to this post along with the linkers of the implementation in the spark source files. Thanks, rose -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hash-Join-in-Spark-tp873.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark app gets slower as it gets executed more times
Are you seeing any exceptions in between running apps? Does restarting the master/workers actually cause Spark to speed back up again? It's possible, for instance, that you run out of disk space, which should cause exceptions but not go away by restarting the master/workers. One thing to worry about is long-running jobs or shells. Currently, state buildup of a single job in Spark *is* a problem, as certain state such as shuffle files and RDD metadata is not cleaned up until the job (or shell) exits. We have hacky ways to reduce this, and are working on a long term solution. However, separate, consecutive jobs should be independent in terms of performance. On Sat, Feb 1, 2014 at 8:27 PM, 尹绪森 yinxu...@gmail.com wrote: Is your spark app an iterative one ? If so, your app is creating a big DAG in every iteration. You should use checkpoint it periodically, say, 10 iterations one checkpoint. 2014-02-01 Aureliano Buendia buendia...@gmail.com: Hi, I've noticed my spark app (on ec2) gets slower and slower as I repeatedly execute it. With a fresh ec2 cluster, it is snappy and takes about 15 mins to complete, after running the same app 4 times it gets slower and takes to 40 mins and more. While the cluster gets slower, the monitoring metrics show less and less activities (almost no cpu, or io). When it gets slow, sometimes the number of running tasks (light blue in web ui progress bar) is zero, and only the number of completed tasks (dark blue) increments. Is this a known spark issue? Do I need to restart spark master and workers in between running apps? -- Best Regards --- Xusen Yin尹绪森 Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia Beijing University of Posts Telecommunications Intel Labs China Homepage: *http://yinxusen.github.io/ http://yinxusen.github.io/*
Re: default parallelism in trunk
Could you give an example where default parallelism is set to 2 where it didn't used to be? Here is the relevant section for the spark standalone mode: CoarseGrainedSchedulerBackend.scala#L211https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L211. If spark.default.parallelism is set, it will override anything else. If it is not set, we will use the total number of cores in the cluster and 2, which is the same logic that has been used since spark-0.7https://github.com/apache/incubator-spark/blob/branch-0.7/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala#L156 . Simplest possibility is that you're setting spark.default.parallelism, otherwise there may be a bug introduced somewhere that isn't defaulting correctly anymore. On Sat, Feb 1, 2014 at 12:30 AM, Koert Kuipers ko...@tresata.com wrote: i just managed to upgrade my 0.9-SNAPSHOT from the last scala 2.9.x version to the latest. everything seems good except that my default parallelism is now set to 2 for jobs instead of some smart number based on the number of cores (i think that is what it used to do). it this change on purpose? i am running spark standalone. thx, koert
Re: How to access global kryo instance?
I see -- the answer is no, we do currently not use an object pool, but instead just try to create it less frequently (typically one SerializerInstance per partition). For instance, you could do rdd.mapPartitions { partitionIterator = val kryo = SparkEnv.get.serializer.newKryo() partitionIterator.map(row = doWorkWithKryo(kryo, row)) } This should amortize the cost greatly. The only requirement of an instance is that it not be used by multiple threads simultaneously, and this fits that requirement perfectly. On Mon, Jan 6, 2014 at 6:59 PM, Aureliano Buendia buendia...@gmail.comwrote: On Tue, Jan 7, 2014 at 2:52 AM, Aaron Davidson ilike...@gmail.com wrote: Please take a look at the source code -- it's relatively friendly, and very useful for digging into Spark internals! (KryoSerializerhttps://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ) As you can see, a Kryo instance is available via ser.newKryo(). You can also use Spark's SerializerInstance interface (which features serialize() and deserialize() methods) by simply calling ser.newInstance(). Sorry, maybe I wasn't clear. What I meant was, does spark use a singleton instance of kryo that can be accessed inside the map closure? Keep calling ser.newKryo() for every element (inside a map closure) has a huge overhead, and it seems newKryo() doesn't use any caching. Twitter chill uses an object pool for kryo instances, I'm not sure how spark handles this. On Mon, Jan 6, 2014 at 5:20 PM, Aureliano Buendia buendia...@gmail.comwrote: In a map closure, I could use: val ser = SparkEnv.get.serializer.asInstanceOf[KryoSerializer] But how to get the instance of Kryo that spark uses from ser? On Tue, Jan 7, 2014 at 1:04 AM, Aaron Davidson ilike...@gmail.comwrote: I believe SparkEnv.get.serializer would return the serializer created from the spark.serializer property. You can also obtain a Kryo serializer directly via it's no-arg constructor (it still invokes your spark.kryo.registrator): val serializer = new KryoSerializer() but this could have some overhead, and so should probably not be done for every element you process. On Mon, Jan 6, 2014 at 4:36 PM, Aureliano Buendia buendia...@gmail.com wrote: Hi, Is there a way to access the global kryo instance created by spark? I'm referring to the one which is passed to registerClasses() in a KryoRegistrator sub class. I'd like to access this kryo instance inside a map closure, so it should be accessible from thw workers side too.
Re: ADD_JARS doesn't properly work for spark-shell
Cool. To confirm, you said you can access the class and construct new objects -- did you do this in the shell itself (i.e., on the driver), or on the executors? Specifically, one of the following two should fail in the shell: new mypackage.MyClass() sc.parallelize(0 until 10, 2).foreach(_ = new mypackage.MyClass()) (or just import it) You could also try running MASTER=local-cluster[2,1,512] which launches 2 executors, 1 core each, with 512MB in a setup that mimics a real cluster more closely, in case it's a bug only related to using local mode. On Sat, Jan 4, 2014 at 7:07 PM, Aureliano Buendia buendia...@gmail.comwrote: On Sun, Jan 5, 2014 at 2:28 AM, Aaron Davidson ilike...@gmail.com wrote: Additionally, which version of Spark are you running? 0.8.1. Unfortunately, this doesn't work either: MASTER=local[2] ADD_JARS=/path/to/my/jar SPARK_CLASSPATH=/path/to/my/jar./spark-shell On Sat, Jan 4, 2014 at 6:27 PM, Aaron Davidson ilike...@gmail.comwrote: I am not an expert on these classpath issues, but if you're using local mode, you might also try to set SPARK_CLASSPATH to include the path to the jar file as well. This should not really help, since adding jars is the right way to get the jars to your executors (which is where the exception appears to be happening), but it would sure be interesting if it did. On Sat, Jan 4, 2014 at 4:50 PM, Aureliano Buendia buendia...@gmail.comwrote: I should add that I can see in the log that the jar being shipped to the workers: 14/01/04 15:34:52 INFO Executor: Fetching http://192.168.1.111:51031/jars/my.jar.jar with timestamp 131979092 14/01/04 15:34:52 INFO Utils: Fetching http://192.168.1.111:51031/jars/my.jar.jar to /var/folders/3g/jyx81ctj3698wbvphxhm4dw4gn/T/fetchFileTemp8322008964976744710.tmp 14/01/04 15:34:53 INFO Executor: Adding file:/var/folders/3g/jyx81ctj3698wbvphxhm4dw4gn/T/spark-d8ac8f66-fad6-4b3f-8059-73f13b86b070/my.jar.jar to class loader On Sun, Jan 5, 2014 at 12:46 AM, Aureliano Buendia buendia...@gmail.com wrote: Hi, I'm trying to access my stand alone spark app from spark-shell. I tried starting the shell by: MASTER=local[2] ADD_JARS=/path/to/my/jar ./spark-shell The log shows that the jar file was loaded. Also, I can access and create a new instance of mypackage.MyClass. The problem is that myRDD.collect() returns RDD[MyClass], and that throws this exception: java.lang.ClassNotFoundException: mypackage.MyClass at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:423) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:356) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:622) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1642) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1341) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.util.Utils$.deserialize(Utils.scala:59) at org.apache.spark.SparkContext$$anonfun$objectFile$1.apply(SparkContext.scala:573) at org.apache.spark.SparkContext$$anonfun$objectFile$1.apply(SparkContext.scala:573) at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:702) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:698) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:872) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:872) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Does this mean that my jar was not shipped to the workers? Is this a known issue, or am I doing something wrong here?
Re: NPE while reading broadcast variable.
Could you post the stack trace you see for the NPE? On Mon, Dec 30, 2013 at 11:31 AM, Archit Thakur archit279tha...@gmail.comwrote: I am still getting it. I googled and found a similar open problem on stackoverflow: http://stackoverflow.com/questions/17794664/accumulator-fails-on-cluster-works-locally . Thx, Archit_Thakur. On Mon, Dec 23, 2013 at 11:32 AM, Archit Thakur archit279tha...@gmail.com wrote: Accessed it: val CURRENT_EXECUTING = ClusterVariableEnumeration.CURRENT_EXECUTION.value.asInstanceOf[String] On Mon, Dec 23, 2013 at 11:27 AM, Archit Thakur archit279tha...@gmail.com wrote: Hi, I am getting NPE while I access broadcast variable. I created an object: object ClusterVariableEnumeration { var CURRENT_EXECUTION: org.apache.spark.broadcast.Broadcast[Any] = null.asInstanceOf[org.apache.spark.broadcast.Broadcast[Any]]; } and then after creating SparkContext, I did ClusterVariableEnumeration.CURRENT_EXECUTION = sc.broadcast(value) and then in map function When I tried to access it it gave me NPE. Idea? Thanks and Regards, Archit Thakur.
Re: How to set Akka frame size
The error you're receiving is because the Akka frame size must be a positive Java Integer, i.e., less than 2^31. However, the frame size is not intended to be nearly the size of the job memory -- it is the smallest unit of data transfer that Spark does. In this case, your task result size is exceeding 10MB, which means that returning the results for a single partition of your data is 10MB. It appears that the default JavaWordCount example has a minSplits value of 1 (ctx.textFile(args[1], 1)). This really means that the number of partitions will be max(1, # hdfs blocks in file). If you have an HDFS block of size ~64MB and all distinct words, the resulting task set may be around the same size, which is 10MB. You have two collaborating solutions: 1. Increase the value of minSplits to reduce the size of any single TaskResult set, like: ctx.textFile(args[1], 256) 2. Increase the Akka frame size by a small amount (e.g., to 20-70MB). Please note that this issue, while annoying, is in good part due to the lack of realism of this example. You very rarely call collect() in Spark in actual usage, as that will put *all *your output data on the driver machine. Much more likely you'd save to an HDFS file or compute the top 100 words or something like that, which would not have this problem. (One final note about your configuration, the Spark Worker is simply responsible for spawning Executors, which do the actual computation. As such, it is typical not to change the Worker memory at all [as it needs very little] but rather to give the majority of a machine's memory distributed amongst the Executors. If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.) On Tue, Dec 24, 2013 at 3:58 AM, leosand...@gmail.com leosand...@gmail.comwrote: Hi, everyone I have a question about the arg spark.akka.frameSize , it default value is 10m . I execute the JavaWordCount read data from hdfs , there is a 7G file . there is a oom error caused by some task result exceeded Akka frame size . but when I modify the arg 1G ,2G , 10G , it show me ERROR ClusterScheduler: Lost executor 1 on ocnosql84: remote Akka client shutdown 13/12/24 19:41:14 ERROR StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down. Sometimes it show me different error info : [lh1@ocnosql84 src]$ java MyWordCount spark://ocnosql84:7077 hdfs://ocnosql76:8030/user/lh1/cdr_ismp_20130218 15000 1g 120 13/12/24 19:20:33 ERROR Client$ClientActor: Failed to connect to master org.jboss.netty.channel.ChannelPipelineException: Failed to initialize a pipeline. at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:209) at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183) at akka.remote.netty.ActiveRemoteClient$$anonfun$connect$1.apply$mcV$sp(Client.scala:173) at akka.util.Switch.liftedTree1$1(LockUtil.scala:33) at akka.util.Switch.transcend(LockUtil.scala:32) at akka.util.Switch.switchOn(LockUtil.scala:55) at akka.remote.netty.ActiveRemoteClient.connect(Client.scala:158) at akka.remote.netty.NettyRemoteTransport.send(NettyRemoteSupport.scala:153) at akka.remote.RemoteActorRef.$bang(RemoteActorRefProvider.scala:247) at org.apache.spark.deploy.client.Client$ClientActor.preStart(Client.scala:61) at akka.actor.ActorCell.create$1(ActorCell.scala:508) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:600) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:209) at akka.dispatch.Mailbox.run(Mailbox.scala:178) at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516) at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259) at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479) at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) Caused by: java.lang.IllegalArgumentException: maxFrameLength must be a positive integer: -1451229184 at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.init(LengthFieldBasedFrameDecoder.java:270) at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.init(LengthFieldBasedFrameDecoder.java:236) at akka.remote.netty.ActiveRemoteClientPipelineFactory.getPipeline(Client.scala:340) at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:207) ... 18 more 13/12/24 19:20:33 ERROR SparkDeploySchedulerBackend: Disconnected from Spark cluster! 13/12/24 19:20:33 ERROR ClusterScheduler: Exiting due to error from cluster scheduler: Disconnected from Spark cluster It seems caused by LengthFieldBasedFrameDecoder lenDec = new
Re: IMPORTANT: Spark mailing lists moving to Apache by September 1st
I'd be fine with one-way mirrors here (Apache threads being reflected in Google groups) -- I have no idea how one is supposed to navigate the Apache list to look for historic threads. On Thu, Dec 19, 2013 at 7:58 PM, Mike Potts maspo...@gmail.com wrote: Thanks very much for the prompt and comprehensive reply! I appreciate the overarching desire to integrate with apache: I'm very happy to hear that there's a move to use the existing groups as mirrors: that will overcome all of my objections: particularly if it's bidirectional! :) On Thursday, December 19, 2013 7:19:06 PM UTC-8, Andy Konwinski wrote: Hey Mike, As you probably noticed when you CC'd spark-de...@googlegroups.com, that list has already be reconfigured so that it no longer allows posting (and bounces emails sent to it). We will be doing the same thing to the spark...@googlegroups.com list too (we'll announce a date for that soon). That may sound very frustrating, and you are *not* alone feeling that way. We've had a long conversation with our mentors about this, and I've felt very similar to you, so I'd like to give you background. As I'm coming to see it, part of becoming an Apache project is moving the community *fully* over to Apache infrastructure, and more generally the Apache way of organizing the community. This applies in both the nuts-and-bolts sense of being on apache infra, but possibly more importantly, it is also a guiding principle and way of thinking. In various ways, moving to apache Infra can be a painful process, and IMO the loss of all the great mailing list functionality that comes with using Google Groups is perhaps the most painful step. But basically, the de facto mailing lists need to be the Apache ones, and not Google Groups. The underlying reason is that Apache needs to take full accountability for recording and publishing the mailing lists, it has to be able to institutionally guarantee this. This is because discussion on mailing lists is one of the core things that defines an Apache community. So at a minimum this means Apache owning the master copy of the bits. All that said, we are discussing the possibility of having a google group that subscribes to each list that would provide an easier to use and prettier archive for each list (so far we haven't gotten that to work). I hope this was helpful. It has taken me a few years now, and a lot of conversations with experienced (and patient!) Apache mentors, to internalize some of the nuance about the Apache way. That's why I wanted to share. Andy On Thu, Dec 19, 2013 at 6:28 PM, Mike Potts masp...@gmail.com wrote: I notice that there are still a lot of active topics in this group: and also activity on the apache mailing list (which is a really horrible experience!). Is it a firm policy on apache's front to disallow external groups? I'm going to be ramping up on spark, and I really hate the idea of having to rely on the apache archives and my mail client. Also: having to search for topics/keywords both in old threads (here) as well as new threads in apache's (clunky) archive, is going to be a pain! I almost feel like I must be missing something because the current solution seems unfeasibly awkward! -- You received this message because you are subscribed to the Google Groups Spark Users group. To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.
Re: problems with standalone cluster
You might also check the spark/work/ directory for application (Executor) logs on the slaves. On Tue, Nov 19, 2013 at 6:13 PM, Umar Javed umarj.ja...@gmail.com wrote: I have a scala script that I'm trying to run on a Spark standalone cluster with just one worker (existing on the master node). But the application just hangs. Here's the worker log output at the time of starting the job: 3/11/19 18:03:13 INFO Worker: Asked to launch executor app-20131119180313-0001/0 for job 13/11/19 18:03:13 INFO ExecutorRunner: Launch command: java -cp :/homes/network/revtr/ujaved/incubator-spark/conf:/homes/network/revtr/ujaved/incubator-spark/assembly/target/scala\ -2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend akka://sp...@drone.cs.washington.edu:57653\ /user/StandaloneScheduler 0 drone.cs.washington.edu 16 app-20131119180313-0001 13/11/19 18:03:13 INFO Worker: Asked to kill executor app-20131119180313-0001/0 13/11/19 18:03:13 INFO ExecutorRunner: Killing process! 13/11/19 18:03:13 INFO Worker: Executor app-20131119180313-0001/0 finished with state KILLED 13/11/19 18:03:13 INFO ExecutorRunner: Redirection to /homes/network/revtr/ujaved/incubator-spark/work/app-20131119180313-0001/0/stdout closed: Stream closed 13/11/19 18:03:13 INFO ExecutorRunner: Redirection to /homes/network/revtr/ujaved/incubator-spark/work/app-20131119180313-0001/0/stderr closed: Bad file descriptor 13/11/19 18:03:13 ERROR Worker: key not found: app-20131119180313-0001/0 Why is the worker killed as soon as it is started? I should mention I don't have this problem when using pyspark. thanks! Umar
Re: groupBy() with really big groups fails
This is very likely due to memory issues. The problem is that each reducer (partition of the groupBy) builds an in-memory table of that partition. If you have very few partitions, this will fail, so the solution is to simply increase the number of reducers. For example: sc.parallelize(1 to 4).groupBy(x = x % 10, *256*).takeSample(false, 10, 10) See also: Tuning Sparkhttp://spark.incubator.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasksdocumentation. On Mon, Dec 9, 2013 at 11:46 AM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was wondering if I could get any insight as to why the following query fails: scala sc.parallelize(1 to 4).groupBy(x = x % 10).takeSample(false, 10, 10) some generic stuff happens 2013-12-09 19:42:30,756 [pool-3-thread-1] INFO org.apache.spark.network.ConnectionManager - Removing ReceivingConnection to ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351) 2013-12-09 19:42:30,813 [connection-manager-thread] INFO org.apache.spark.network.ConnectionManager - Key not valid ? sun.nio.ch.SelectionKeyImpl@f2d755 2013-12-09 19:42:30,756 [pool-3-thread-2] INFO org.apache.spark.network.ConnectionManager - Removing ReceivingConnection to ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961) 2013-12-09 19:42:30,814 [pool-3-thread-4] INFO org.apache.spark.network.ConnectionManager - Removing SendingConnection to ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961) 2013-12-09 19:42:30,814 [pool-3-thread-2] INFO org.apache.spark.network.ConnectionManager - Removing SendingConnection to ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961) 2013-12-09 19:42:30,814 [pool-3-thread-1] INFO org.apache.spark.network.ConnectionManager - Removing SendingConnection to ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351) 2013-12-09 19:42:30,815 [pool-3-thread-3] INFO org.apache.spark.network.ConnectionManager - Removing SendingConnection to ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126) 2013-12-09 19:42:30,818 [connection-manager-thread] INFO org.apache.spark.network.ConnectionManager - key already cancelled ? sun.nio.ch.SelectionKeyImpl@f2d755 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:266) at org.apache.spark.network.ConnectionManager$$anon$3.run(ConnectionManager.scala:97) 2013-12-09 19:42:30,825 [pool-3-thread-4] INFO org.apache.spark.network.ConnectionManager - Removing ReceivingConnection to ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126) house of cards collapses Let me know if more of the logs would be useful, although it seems from this point on everything falls apart. I'm using an EC2-script launched cluster, 10 nodes, m2.4xlarge, 65.9GB of RAM per slave. Let me know if any other system configurations are relevant. In a more general sense – our use case has been involving large group-by queries, so I was trying to simulate this kind of workload in the spark shell. Is there any good way to consistently get these kinds of queries to work? Assume that during the general use-case it can't be known a-priori how many groups there will be. Thanks, -Matt Cheah
Re: Serializable incompatible with Externalizable error
This discussion seems to indicate the possibility of a mismatch between one side being Serializable and the other being Externalizable: https://forums.oracle.com/thread/2147644 In general, the semantics of Serializable can be pretty strange as it doesn't really behave the same as usual interfaces. It may actually be possible that if a superclass implements Serializable and you implement Externalizable, things can screw up, for instance. On Tue, Dec 3, 2013 at 10:48 AM, Matt Cheah mch...@palantir.com wrote: That can't be – Externalizable is a specific sub-interface of Serializable that allows custom serialization formats. Granted, I could accomplish the same thing by implementing the private readObject() and writeObject() methods in this class, and implement Serializable only. It just seems odd to me that I'd have to do so. Especially since the tuning guide suggests to use Externalizable: http://spark.incubator.apache.org/docs/latest/tuning.html -Matt Cheah From: Andrew Ash and...@andrewash.com Reply-To: user@spark.incubator.apache.org user@spark.incubator.apache.org Date: Monday, December 2, 2013 10:45 PM To: user@spark.incubator.apache.org user@spark.incubator.apache.org Subject: Re: Serializable incompatible with Externalizable error At least from http://stackoverflow.com/questions/817853/what-is-the-difference-between-serializable-and-externalizable-in-javahttps://urldefense.proofpoint.com/v1/url?u=http://stackoverflow.com/questions/817853/what-is-the-difference-between-serializable-and-externalizable-in-javak=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0Ar=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0Am=1AzN%2FqpvE4RUjLF0SQcqdBp8GpAKxHbhF7JFlnvwkHI%3D%0As=ca3c6dbf7a4045601fbd4b5634863075aca6f5dda2606bb01b34eb47f49eb225it looks like Externalizable is roughly an old-java version of Serializable. Does that class implement both interfaces? Can you take away the Externalizable interface if it's being used? On Mon, Dec 2, 2013 at 7:15 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I'm running into a case where I'm creating a Java RDD of an Externalizable class, and getting this stack trace: java.io.InvalidClassException (java.io.InvalidClassException: com.palantir.finance.datatable.server.spark.WritableDataRow; Serializable incompatible with Externalizable) java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:634) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483)some other Java stuff java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) I'm running on a spark cluster generated by the EC2 Scripts. This doesn't happen if I'm running things with local[N]. Any ideas? Thanks, -Matt Cheah
Re: spark-shell not working on standalone cluster (java.io.IOException: Cannot run program compute-classpath.sh)
There is a pull request currently to fix this exact issue, I believe, at https://github.com/apache/incubator-spark/pull/192. It's very small and only touches the script files, so you could apply it to your current version and distribute it to the workers. The fix here is that you add an additional variable in spark-env that specifies the REMOTE_SPARK_HOME. On Mon, Nov 25, 2013 at 3:40 AM, Grega Kešpret gr...@celtra.com wrote: It seems there is already an open ticket for this - https://spark-project.atlassian.net/browse/SPARK-905 , but for version 0.7.3. Grega -- [image: Inline image 1] *Grega Kešpret* Analytics engineer Celtra — Rich Media Mobile Advertising celtra.com http://www.celtra.com/ | @celtramobilehttp://www.twitter.com/celtramobile On Mon, Nov 25, 2013 at 12:13 PM, Grega Kešpret gr...@celtra.com wrote: Sorry, forgot to mention, I run spark version v0.8.0-incubating from https://github.com/apache/incubator-spark.git. It seems to work when local Spark directory is also /opt/spark, so I think this confirms my doubt that SPARK_HOME somehow doesn't get passed to the Executor? Grega -- [image: Inline image 1] *Grega Kešpret* Analytics engineer Celtra — Rich Media Mobile Advertising celtra.com http://www.celtra.com/ | @celtramobilehttp://www.twitter.com/celtramobile celtra_logo.png
Re: oome from blockmanager
Jerry, I need to correct what I said about the 100KB for each FastBufferedOutputStream -- this is actually a Spark buffer, not a compression buffer. The size can be configured using the spark.shuffle.file.buffer.kb System property, and it defaults to 100. I am still curious if you're using compression or seeing more than 48k DiskBlockObjectWriters to account for the remaining memory used. On Fri, Nov 22, 2013 at 9:05 AM, Aaron Davidson ilike...@gmail.com wrote: Great, thanks for the feedback. It sounds like you're using the LZF compression scheme -- switching to Snappy should see significantly less buffer space used up per DiskBlockObjectWriter, but this doesn't really solve the underlying problem. In general I've been thinking of Spark nodes as having high memory and a moderate number of cores, but with 24 cores and 40GB of memory, each core really doesn't get that much memory individually, despite every one needing its own set of DiskBlockObjectWriters. One thing that is a little odd is that with your numbers, you should have 2000 (reducers) * 24 (cores) = 48k DiskBlockObjectWriters. These should only require a total of 4.8GB for the entire node, though, rather than 80% of your JVM memory. Were you seeing significantly more than 48k DiskBlockObjectWriters? On Fri, Nov 22, 2013 at 1:38 AM, Shao, Saisai saisai.s...@intel.comwrote: Hi Aaron, I’ve also met the same problem that shuffle takes so much overhead for large number of partitions. I think it is an important issue when processing large data. In my case I have 2000 mapper and 2000 reducers, I dump the memory of executor and found that byte array takes about 80% of total jvm memory, which are referred by FastBufferedOutputStream, and created by DiskBlockObjectWriter. It seems that there are so many instances of DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer for FastBufferedOutputStream by default. These buffers are persisted through task execution period and cannot be garbage collected unless task is finished. My cluster has 6 nodes, and 40G memory and 24 core per node, I tried with 5000 partitions, this will easily got OOM. What a dilemma is that my application needs groupByKey transformation which requires small partitions size, but small partition size will lead to more partition numbers that also consumes lots of memory. Thanks Jerry *From:* Aaron Davidson [mailto:ilike...@gmail.com] *Sent:* Friday, November 22, 2013 2:54 PM *To:* user@spark.incubator.apache.org *Subject:* Re: oome from blockmanager Thanks for your feedback; I think this is a very important issue on the usability front. One thing to consider is that at some data size, one simply needs larger or more nodes. m1.large is essentially the smallest ec2 instance size that can run a Spark job of any reasonable size. That's not an excuse for an OOM, really -- one should generally just see (heavily) degraded performance instead of actually failing the job. Additionally, the number of open files scales with the number of reducers in Spark, rather than, say, Map Reduce, where each mapper only writes to one file, at the cost of later sorting the entire thing. This unfortunately means that adding nodes isn't really a full solution in your case, since each one would try to have 36k compressed output streams open. The short term solutions have already been discussed: decrease the number of reducers (and mappers, if you need them to be tied) or potentially turn off compression if Snappy is holding too much buffer space. A third option would actually be to decrease the number of executors per node to 1, since that would double the available memory and roughly halve the usage. Clearly either of the latter two solutions will produce a significant slowdown, while the first should keep the same or better performance. While Spark is good at handling a large number of partitions, there is still some cost to schedule every task, as well as to store and forward the metadata for every shuffle block (which grows with R * M), so the ideal partition size is one that fits exactly into memory without OOMing -- although this is of course an unrealistic situation to aim for. The longer term solutions include algorithms which degrade gracefully instead of OOMing (although this would be a solution for too-large partitions instead of too-little, where the metadata and buffering becomes the issue) and to potentially adopt a more Map-Reducey style of shuffling where we would only need to write to 1 file per executor at a time, with some significant processing and disk bandwidth cost. I am currently investigating shuffle file performance, and thanks to your feedback here, I'll additionally investigate the memory overheads inherent in shuffling as well. On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman stephen.haber...@gmail.com wrote: More significant in shuffling data
Re: debugging a Spark error
Have you looked a the Spark executor logs? They're usually located in the $SPARK_HOME/work/ directory. If you're running in a cluster, they'll be on the individual slave nodes. These should hopefully reveal more information. On Mon, Nov 18, 2013 at 3:42 PM, Chris Grier gr...@icsi.berkeley.eduwrote: Hi, I'm trying to figure out what the problem is with a job that we are running on Spark 0.7.3. When we write out via saveAsTextFile we get an exception that doesn't reveal much: 13/11/18 15:06:19 INFO cluster.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:849) at spark.storage.DiskStore.getBytes(DiskStore.scala:86) at spark.storage.DiskStore.getValues(DiskStore.scala:92) at spark.storage.BlockManager.getLocal(BlockManager.scala:284) at spark.storage.BlockFetcherIterator$$anonfun$ 13.apply(BlockManager.scala:1027) at spark.storage.BlockFetcherIterator$$anonfun$ 13.apply(BlockManager.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach( ArrayBuffer.scala:47) at spark.storage.BlockFetcherIterator.init( BlockManager.scala:1026) at spark.storage.BlockManager.getMultiple(BlockManager.scala:478) at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher. scala:51) at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher. scala:10) at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply( CoGroupedRDD.scala:127) at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply( CoGroupedRDD.scala:115) at scala.collection.IndexedSeqOptimized$class. foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38) at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:704) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:714) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.MappedRDD.compute(MappedRDD.scala:12) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.MappedRDD.compute(MappedRDD.scala:12) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.scheduler.ResultTask.run(ResultTask.scala:77) at spark.executor.Executor$TaskRunner.run(Executor.scala:100) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Any ideas? -Chris
Re: EC2 node submit jobs to separate Spark Cluster
The main issue with running a spark-shell locally is that it orchestrates the actual computation, so you want it to be close to the actual Worker nodes for latency reasons. Running a spark-shell on EC2 in the same region as the Spark cluster avoids this problem. The error you're seeing seems to indicate a different issue. Check the Master web UI (accessible on port 8080 at the master's IP address) to make sure that Workers are successfully registered and they have the expected amount of memory available to Spark. You can also check to see how much memory your spark-shell is trying to get per executor. A couple common problems are (1) an abandoned spark-shell is holding onto all of your cluster's resources or (2) you've manually configured your spark-shell to try to get more memory than your Workers have available. Both of these should be visible in the web UI. On Mon, Nov 18, 2013 at 5:00 PM, Matt Cheah mch...@palantir.com wrote: Hi, I'm working with an infrastructure that already has its own web server set up on EC2. I would like to set up a *separate* spark cluster on EC2 with the scripts and have the web server submit jobs to this spark cluster. Is it possible to do this? I'm getting some errors running the spark shell from the spark shell on the web server: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I have heard that it's not possible for any local computer to connect to the spark cluster, but I was wondering if other EC2 nodes could have their firewalls configured to allow this. We don't want to deploy the web server on the master node of the spark cluster. Thanks, -Matt Cheah
Re: foreachPartition in Java
Also, in general, you can workaround shortcomings in the Java API by converting to a Scala RDD (using JavaRDD's rdd() method). The API tends to be much clunkier since you have to jump through some hoops to talk to a Scala API in Java, though. In this case, JavaRDD's mapPartition() method will likely be the cleanest solution as Patrick said. On Sun, Nov 17, 2013 at 5:03 PM, Patrick Wendell pwend...@gmail.com wrote: Can you just call mapPartitions and ignore the result? - Patrick On Sun, Nov 17, 2013 at 4:45 PM, Yadid Ayzenberg ya...@media.mit.edu wrote: Hi, According to the API, foreachPartition() is not yet implemented in Java. Are there any workarounds to get the same functionality ? I have a non serializable DB connection and instantiating it is pretty expensive, so I prefer to do it on a per partition basis. thanks, Yadid
Re: number of splits for standalone cluster mode
The number of splits can be configured when reading the file, as an argument to textFile(), sequenceFile(), etc (see docshttp://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext@textFile(String,Int):RDD[String]). Note that this is a minimum, however, as certain input sources may not allow partitions larger than a certain size (e.g., reading from HDFS may force partitions to be at most ~130 MB [depending on HDFS block size]). If you wish to have fewer partitions than the minimum your input source allows, you can use the RDD.coalesce()http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.RDDmethod to locally combine partitions. On Sun, Nov 17, 2013 at 1:33 PM, Umar Javed umarj.ja...@gmail.com wrote: Hi, When running Spark in the standalone cluster node, is there a way to configure the number of splits for the input file(s)? It seems like it is approximately 32 MB for every core be default. Is that correct? For example in my cluster there are two workers, each running on a machine with two cores. For an input file of size 500MB, Spark schedules 16 tasks for the initial map (500/32 ~ 16) thanks! Umar
Re: failure notice
Great, thanks for the update! Your ant version matches mine, though I can't reproduce the error. Weird. I have created SPARK-959https://spark-project.atlassian.net/browse/SPARK-959to track this issue and PR #183 https://github.com/apache/incubator-spark/pull/183 to hopefully solve the issue in the future. On Sun, Nov 17, 2013 at 10:08 PM, Egon Kidmose kidm...@gmail.com wrote: I applied the first approach in $SPARK_HOME/project/SparkBuild.scala (.scala, not .sbt, as the later wasn't present) This solved the issue. Thanks for your help! ~/Downloads/spark-0.8.0-incubating $ ant -v Apache Ant(TM) version 1.8.2 compiled on May 18 2012 Mvh/BR Egon Kidmose On Sun, Nov 17, 2013 at 7:40 PM, Aaron Davidson ilike...@gmail.com wrote: Could you report your ant/Ivy version? Just run ant -version The fundamental problem is that Ivy is stupidly thinking .orbit is the file extension when it should be .jar. There are two possible fixes you can try, and please let us know if one or the other works. In $SPARK_HOME/project/SparkBuild.sbt, find the line that contains the text org.eclipse.jetty and add this line directly after it: org.eclipse.jetty.orbit % javax.servlet % 2.5.0.v201103041518 artifacts Artifact(javax.servlet, jar, jar), Hopefully this will force Ivy to do the right thing. If not, you can try the solution from the mailing list in the same file, by adding the lines: ivyXML := dependency org=org.eclipse.jetty.orbit name=javax.servlet rev=2.5.0.v201103041518 artifact name=javax.servlet type=orbit ext=jar/ /dependency This solution ideally just does the same thing, albeit in a slightly less SBT-ish way. On Sun, Nov 17, 2013 at 6:44 PM, Egon Kidmose kidm...@gmail.com wrote: Hi All, I'm trying to get started on using spark, but following the quick start guide I encounter an error. As per documentation I run sbt/sbt assembly from the root folder, but I get an error for downloading javax.servlet.orbit. (see below) It's the same result for 0.8.0 and what's on github. It seems similar to the issue earlier discussed here: http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E I don't understand how to apply this to my problem. (Which *.sbt should the line be entered into?) Any advice on how to solve this? Thanks! Mvh/BR Egon Kidmose ~/Downloads/spark-0.8.0- incubating $ uname -a Linux ekidmose-laptop-linux 3.8.0-19-generic #30-Ubuntu SMP Wed May 1 16:35:23 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux ~/Downloads/spark-0.8.0-incubating $ javac -version javac 1.7.0_25 ~/Downloads/spark-0.8.0-incubating $ sbt/sbt assembly [info] Loading project definition from /home/ekidmose/Downloads/spark-0.8.0-incubating/project/project [info] Updating {file:/home/ekidmose/Downloads/spark-0.8.0-incubating/project/project/}default-8557a6... [info] Resolving org.scala-sbt#precompiled-2_10_1;0.12.4 ... [info] Done updating. [info] Compiling 1 Scala source to /home/ekidmose/Downloads/spark-0.8.0-incubating/project/project/target/scala-2.9.2/sbt-0.12/classes... [info] Loading project definition from /home/ekidmose/Downloads/spark-0.8.0-incubating/project [info] Updating {file:/home/ekidmose/Downloads/spark-0.8.0-incubating/project/}plugins... [info] Resolving org.scala-sbt#precompiled-2_10_1;0.12.4 ... [info] Done updating. [info] Compiling 1 Scala source to /home/ekidmose/Downloads/spark-0.8.0-incubating/project/target/scala-2.9.2/sbt-0.12/classes... [info] Set current project to root (in build file:/home/ekidmose/Downloads/spark-0.8.0-incubating/) [info] Updating {file:/home/ekidmose/Downloads/spark-0.8.0-incubating/}core... [info] Resolving org.apache.derby#derby;10.4.2.0 ... [warn] [NOT FOUND ] org.eclipse.jetty.orbit#javax.servlet;2.5.0.v201103041518!javax.servlet.orbit (77ms) [warn] public: tried [warn] http://repo1.maven.org/maven2/org/eclipse/jetty/orbit/javax.servlet/2.5.0.v201103041518/javax.servlet-2.5.0.v201103041518.orbit [warn] :: [warn] :: FAILED DOWNLOADS:: [warn] :: ^ see resolution messages for details ^ :: [warn] :: [warn] :: org.eclipse.jetty.orbit#javax.servlet;2.5.0.v201103041518!javax.servlet.orbit [warn] :: sbt.ResolveException: download failed: org.eclipse.jetty.orbit#javax.servlet;2.5.0.v201103041518!javax.servlet.orbit at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:214) at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:122) at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:121) at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:117) at sbt.IvySbt$Module
Re: Memory configuration in local mode
I was under the impression that he was using the same JVM for Spark and other stuff, and wanted to limit how much of it Spark could use. Patrick's solution is of course the right way to go if that's not the case. On Sat, Nov 16, 2013 at 9:40 AM, Patrick Wendell pwend...@gmail.com wrote: If you are using local mode, you can just pass -Xmx32g to the JVM that is launching spark and it will have that much memory. On Fri, Nov 15, 2013 at 6:30 PM, Aaron Davidson ilike...@gmail.com wrote: One possible workaround would be to use the local-cluster Spark mode. This is normally used only for testing, but it will actually spawn a separate process for the executor. The format is: new SparkContext(local-cluster[1,4,32000]) This will spawn 1 Executor that is allocated 4 cores and 32GB (approximated as 32k MB). Since this is a separate process with its own JVM, you'd probably want to just change your original JVM's memory to 32 GB. Note that since local-cluster mode more closely simulates a cluster, it's possible that certain issues like dependency problems may arise that don't appear when using local mode. On Fri, Nov 15, 2013 at 11:43 AM, Alex Boisvert alex.boisv...@gmail.com wrote: When starting a local-mode Spark instance, e.g., new SparkContext(local[4]), what memory configuration options are available/considered to limit Spark's memory usage? For instance, if I have a JVM with 64GB and would like to reserve/limit Spark to using only 32GB of the heap. thanks!
Re: Memory configuration in local mode
One possible workaround would be to use the local-cluster Spark mode. This is normally used only for testing, but it will actually spawn a separate process for the executor. The format is: new SparkContext(local-cluster[1,4,32000]) This will spawn 1 Executor that is allocated 4 cores and 32GB (approximated as 32k MB). Since this is a separate process with its own JVM, you'd probably want to just change your original JVM's memory to 32 GB. Note that since local-cluster mode more closely simulates a cluster, it's possible that certain issues like dependency problems may arise that don't appear when using local mode. On Fri, Nov 15, 2013 at 11:43 AM, Alex Boisvert alex.boisv...@gmail.comwrote: When starting a local-mode Spark instance, e.g., new SparkContext(local[4]), what memory configuration options are available/considered to limit Spark's memory usage? For instance, if I have a JVM with 64GB and would like to reserve/limit Spark to using only 32GB of the heap. thanks!
Re: mapping of shuffle outputs to reduce tasks
It is responsible for a subset of shuffle blocks. MapTasks split up their data, creating one shuffle block for every reducer. During the shuffle phase, the reducer will fetch all shuffle blocks that were intended for it. On Sun, Nov 10, 2013 at 9:38 PM, Umar Javed umarj.ja...@gmail.com wrote: I was wondering how does the scheduler assign the ShuffledRDD locations to the reduce tasks? Say that you have 4 reduce tasks, and a number of shuffle blocks across two machines. Is each reduce task responsible for a subset of individual keys or a subset of shuffle blocks? Umar
Re: oome from blockmanager
As a followup on this, the memory footprint of all shuffle metadata has been greatly reduced. For your original workload with 7k mappers, 7k reducers, and 5 machines, the total metadata size should have decreased from ~3.3 GB to ~80 MB. On Tue, Oct 29, 2013 at 9:07 AM, Aaron Davidson ilike...@gmail.com wrote: Great! Glad to hear it worked out. Spark definitely has a pain point about deciding the right number of partitions, and I think we're going to be spending a lot of time trying to reduce that issue. Currently working on the patch to reduce the shuffle file block overheads, but in the meantime, you can set spark.shuffle.consolidateFiles=false to exchange OOMEs due to too many partitions for worse performance (probably an acceptable tradeoff). On Mon, Oct 28, 2013 at 2:31 PM, Stephen Haberman stephen.haber...@gmail.com wrote: Hey guys, As a follow up, I raised our target partition size to 600mb (up from 64mb), which split this report's 500gb of tiny S3 files into ~700 partitions, and everything ran much smoother. In retrospect, this was the same issue we'd ran into before, having too many partitions, and had previously solved by throwing some guesses at coalesce to make it magically go away. But now I feel like we have a much better understanding of why the numbers need to be what they are, which is great. So, thanks for all the input and helping me understand what's going on. It'd be great to see some of the optimizations to BlockManager happen, but I understand in the end why it needs to track what it does. And I was also admittedly using a small cluster anyway. - Stephen
Re: Executor could not connect to Driver?
I've seen this happen before due to the driver doing long GCs when the driver machine was heavily memory-constrained. For this particular issue, simply freeing up memory used by other applications fixed the problem. On Fri, Nov 1, 2013 at 12:14 AM, Liu, Raymond raymond@intel.com wrote: Hi I am encounter an issue that the executor actor could not connect to Driver actor. But I could not figure out what's the reason. Say the Driver actor is listening on :35838 root@sr434:~# netstat -lpv Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp0 0 *:50075 *:* LISTEN 18242/java tcp0 0 *:50020 *:* LISTEN 18242/java tcp0 0 *:ssh *:* LISTEN 1325/sshd tcp0 0 *:50010 *:* LISTEN 18242/java tcp6 0 0 sr434:35838 [::]:* LISTEN 9420/java tcp6 0 0 [::]:40390 [::]:* LISTEN 9420/java tcp6 0 0 [::]:4040 [::]:* LISTEN 9420/java tcp6 0 0 [::]:8040 [::]:* LISTEN 28324/java tcp6 0 0 [::]:60712 [::]:* LISTEN 28324/java tcp6 0 0 [::]:8042 [::]:* LISTEN 28324/java tcp6 0 0 [::]:34028 [::]:* LISTEN 9420/java tcp6 0 0 [::]:ssh[::]:* LISTEN 1325/sshd tcp6 0 0 [::]:45528 [::]:* LISTEN 9420/java tcp6 0 0 [::]:13562 [::]:* LISTEN 28324/java while the executor driver report errors as below : 13/11/01 13:16:43 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka://spark@sr434:35838/user/CoarseGrainedScheduler 13/11/01 13:16:43 ERROR executor.CoarseGrainedExecutorBackend: Driver terminated or disconnected! Shutting down. Any idea? Best Regards, Raymond Liu
Re: repartitioning RDDS
Stephen is exactly correct, I just wanted to point out that in Spark 0.8.1 and above, the repartition function has been added to be a clearer way to accomplish what you want. (Coalescing into a larger number of partitions doesn't make much linguistic sense.) On Thu, Oct 31, 2013 at 7:48 AM, Stephen Haberman stephen.haber...@gmail.com wrote: Is it possible to repartition RDDs other than by the coalesce method. I am primarily interested in making finer grained partitioning or rebalancing an unbalanced parttioning, without coalescing. I believe if you use the shuffle=true parameter, coalesce will do what you want, and essentially becomes a general repartition method. Specifically, yes, while shuffle=false can only make larger partitions, but with shuffle=true, you can break your partitions up into many smaller partitions, with the content based on a hash partitioner. I believe that's what you're asking for? - Stephen
Re: Spark cluster memory configuration for spark-shell
You are correct. If you are just using spark-shell in local mode (i.e., without cluster), you can set the SPARK_MEM environment variable to give the driver more memory. E.g.: SPARK_MEM=24g ./spark-shell Otherwise, if you're using a real cluster, the driver shouldn't require a significant amount of memory, so SPARK_MEM should not have to be used. On Tue, Oct 29, 2013 at 12:40 PM, Soumya Simanta soumya.sima...@gmail.comwrote: I'm new to Spark. I want to try out a few simple example from the Spark shell. However, I'm not sure how to configure it so that I can make the max. use of memory on my workers. On average I've around 48 GB of RAM on each node on my cluster. I've around 10 nodes. Based on the documentation I could find memory based configuration in two places. *1. $SPARK_INSTALL_DIR/dist/conf/spark-env.sh * *SPARK_WORKER_MEMORY* Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that each application's *individual* memory is configured using its spark.executor.memory property. *2. spark.executor.memory JVM flag. * spark.executor.memory512m Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. 512m, 2g). http://spark.incubator.apache.org/docs/latest/configuration.html#system-properties In my case I want to use the max. memory possible on each node. My understanding is that I don't have to change *SPARK_WORKER_MEMORY *and I will have to increase spark.executor.memory to something big (e.g., 24g or 32g). Is this correct? If yes, what is the correct way of setting this property if I just want to use the spark-shell. Thanks. -Soumya
Re: gc/oome from 14,000 DiskBlockObjectWriters
Snappy sounds like it'd be a better solution here. LZF requires a pretty sizeable buffer per stream (accounting for the 300k you're seeing). It looks like you have 7000 reducers, and each one requires an LZF-compressed stream. Snappy has a much lower overhead per stream, so I'd give it a try. Thanks, by the way, for the very detailed problem description and trying it on Spark 0.8! Hopefully we can get this issue resolved. On Fri, Oct 25, 2013 at 8:30 AM, Stephen Haberman stephen.haber...@gmail.com wrote: We're moving from Spark 0.7.x to Spark 0.8 (or master actually, to get the latest shuffle improvement) FWIW I went back from Spark master to Spark 0.8 and am seeing the same behavior: OOMEs because of ~14,000 DiskBlockObjectWriters with 300k of memory each. So, it is not a change from the recent shuffle patch. - Stephen
Re: Broken link in quickstart
Thanks for the heads up! I have submitted a pull request to fix it (#98https://github.com/apache/incubator-spark/pull/98), so it should be corrected soon. In the meantime, if anyone is curious, the real link should be http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.RDD . On Tue, Oct 22, 2013 at 3:21 AM, Sebastian Schelter s...@apache.org wrote: No, the link to https://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.RDD gives a 404. On 22.10.2013 12:19, Jesvin Jose wrote: Do you mean the error that the SSL certificates are valid only for *. apache.org? On Tue, Oct 22, 2013 at 12:59 PM, Sebastian Schelter s...@apache.org wrote: The link to the RDD API doc on https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#transformations points to a non-existing page. Best, Sebastian
Re: Help with Initial Cluster Configuration / Tuning
On the other hand, I totally agree that memory usage in Spark is rather opaque, and is one area where we could do a lot better at in terms of communicating issues, through both docs and instrumentation. At least with serialization and such, you can get meaningful exceptions (hopefully), but OOMs are just blanket something wasn't right somewhere. Debugging them empirically would require deep diving into Spark's heap allocations, which requires a lot more knowledge of Spark internals than should be required for general usage. On Tue, Oct 22, 2013 at 9:22 AM, Mark Hamstra m...@clearstorydata.comwrote: Yes, but that also illustrates the problem faced by anyone trying to write a little white paper or guide lines to make newbies' experience painless. Distributed computing clusters are necessarily complex things, and problems can crop up in multiple locations, layers or subsystems. It's just not feasible to quickly bring up to speed someone with no experience in distributed programming and cluster systems. It takes a lot of knowledge, both broad and deep. Very few people have the complete scope of knowledge and experience required, so creating, debugging and maintaining a cluster computing application almost always has to be a team effort. Support organizations and communities can replace some of the need for a knowledgeable and well-functioning team, but not all of it; and at some point you have to expect that debugging is going to take a considerable amount of painstaking, systematic effort -- including a close reading of the available docs. Several people are working on making more and better reference and training material available, and some of that will include trouble-shooting guidance, but that doesn't mean that there can ever be one little paper to solve newbies' (or more experienced developers') problems or provide adequate guidance. There's just too much to cover and too many different kinds or levels of initial-user knowledge to make that completely feasible. On Tue, Oct 22, 2013 at 8:50 AM, Shay Seng s...@1618labs.com wrote: Hey Mark, I didn't mean to say that the information isn't out there -- just that when something goes wrong with spark, the scope of what could be wrong is so large - some bad setting with JVM, serializer, akka, badly written scala code, algorithm wrong, check worker logs, check executor stderrs, When I looked at this post this morning, my initial thought wasn't that countByValue would be at fault. ...probably since I've only been using Scala/Spark for a month or so. It was just a suggestion to help newbies come up to speed more quickly and gain insights into how to debug issues. On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra m...@clearstorydata.comwrote: There's no need to guess at that. The docs tell you directly: def countByValue(): Map[T, Long] Return the count of each unique value in this RDD as a map of (value, count) pairs. The final combine step happens locally on the master, equivalent to running a single reduce task. On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng s...@1618labs.com wrote: Hi Matei, I've seen several memory tuning queries on this mailing list, and also heard the same kinds of queries at the spark meetup. In fact the last bullet point in Josh Carver(?) slides, the guy from Bizo, was memory tuning is still a mystery. I certainly had lots of issues in when I first started. From memory issues to gc issues, things seem to run fine until you try something with 500GB of data etc. I was wondering if you could write up a little white paper or some guide lines on how to set memory values, and what to look at when something goes wrong? Eg. I would never gave guessed that countByValue happens on a single machine etc. On Oct 21, 2013 6:18 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi there, The problem is that countByValue happens in only a single reduce task -- this is probably something we should fix but it's basically not designed for lots of values. Instead, do the count in parallel as follows: val counts = mapped.map(str = (str, 1)).reduceByKey((a, b) = a + b) If this still has trouble, you can also increase the level of parallelism of reduceByKey by passing it a second parameter for the number of tasks (e.g. 100). BTW one other small thing with your code, flatMap should actually work fine if your function returns an Iterator to Traversable, so there's no need to call toList and return a Seq in ngrams; you can just return an Iterator[String]. Matei On Oct 21, 2013, at 1:05 PM, Timothy Perrigo tperr...@gmail.com wrote: Hi everyone, I am very new to Spark, so as a learning exercise I've set up a small cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which I'm hoping to use to calculate ngram frequencies from text files of various sizes (I'm not doing anything with them; I just thought this would be slightly more interesting
Re: Spark REPL produces error on a piece of scala code that works in pure Scala REPL
Out of curiosity, does the Scala 2.10 Spark interpreter patch fix this using macros as Matei suggests in the linked discussion? Or is that still future work, but now possible? On Fri, Oct 11, 2013 at 6:04 PM, Reynold Xin r...@apache.org wrote: This is a known problem and has to do with peculiarity of the Scala shell: https://groups.google.com/forum/#!searchin/spark-users/error$3A$20type$20mismatch|sort:relevance/spark-users/bwAmbUgxWrA/HwP4Nv4adfEJ On Fri, Oct 11, 2013 at 6:01 PM, Aaron Davidson ilike...@gmail.comwrote: Playing around with this a little more, it seems that classOf[Animal] is this.Animal in Spark and Animal in normal Scala. Also, trying to do something like this: class Zoo[A : *this.*Animal](thing: A) { } works in Scala but throws a weird error in Spark: error: type Animal is not a member of this.$iwC On Fri, Oct 11, 2013 at 4:55 PM, Shay Seng s...@1618labs.com wrote: Hey, I seeing a funny situation where a piece of code executes in a pure Scala REPL but not in a Spark-shell. I'm using Scala 2.9.3 with Spark 0.8.0 In Spark I see: class Animal() { def says():String = ??? } val amimal = new Animal amimal: this.Animal = Animal@df27cd5 class Zoo[A : Animal](thing: A) { def whoami()=thing.getClass def chat()=thing.says } val z = new Zoo[Animal](amimal) console:16: error: type mismatch; found : this.Animal required: this.Animal val z = new Zoo[Animal](amimal) ^ But if I run the exact code in the scala REPL: val z = new Zoo[Animal](amimal) z: Zoo[Animal] = Zoo@738ff53f Both repl report using scala 2.9.3 Spark: Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_40) Scala: Welcome to Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_40). Any ideas? tks, Shay
Re: Spark REPL produces error on a piece of scala code that works in pure Scala REPL
Playing around with this a little more, it seems that classOf[Animal] is this.Animal in Spark and Animal in normal Scala. Also, trying to do something like this: class Zoo[A : *this.*Animal](thing: A) { } works in Scala but throws a weird error in Spark: error: type Animal is not a member of this.$iwC On Fri, Oct 11, 2013 at 4:55 PM, Shay Seng s...@1618labs.com wrote: Hey, I seeing a funny situation where a piece of code executes in a pure Scala REPL but not in a Spark-shell. I'm using Scala 2.9.3 with Spark 0.8.0 In Spark I see: class Animal() { def says():String = ??? } val amimal = new Animal amimal: this.Animal = Animal@df27cd5 class Zoo[A : Animal](thing: A) { def whoami()=thing.getClass def chat()=thing.says } val z = new Zoo[Animal](amimal) console:16: error: type mismatch; found : this.Animal required: this.Animal val z = new Zoo[Animal](amimal) ^ But if I run the exact code in the scala REPL: val z = new Zoo[Animal](amimal) z: Zoo[Animal] = Zoo@738ff53f Both repl report using scala 2.9.3 Spark: Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_40) Scala: Welcome to Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_40). Any ideas? tks, Shay
Re: spark_ec2 script in 0.8.0 and mesos
Also, please post feature requests here: http://spark-project.atlassian.net Make sure to search prior to posting to avoid duplicates. On Tue, Oct 8, 2013 at 11:50 AM, Matei Zaharia matei.zaha...@gmail.comwrote: Hi Shay, We actually don't support Mesos in the EC2 scripts anymore -- sorry about that. If you want to deploy Mesos on EC2, I'd recommend looking at Mesos's own EC2 scripts. Then it's fairly easy to launch Spark on there. If you want to deploy Mesos locally you can go through the Spark docs for that. Matei On Oct 8, 2013, at 10:15 AM, Shay Seng s...@1618labs.com wrote: Hi, The http://spark.incubator.apache.org/docs/latest/running-on-mesos.html; running on mesos web page says that the spark_ec2 script provides and easy way to launch spark cluster with mesos configured. In 0.7.3, the spark_ec2 script had a --cluster-type option, but that seems to be taken out of the 0.8.0 version... Any tips on how I can configure and run spark on mesos? Also I have a couple of suggestions for the spark_ec2 scripts is there somewhere I can post feature requests? tks, Shay
Re: spark through vpn, SPARK_LOCAL_IP
You might try also setting spark.driver.host to the correct IP in the conf/spark-env.sh SPARK_JAVA_OPTs as well. e.g., -Dspark.driver.host=192.168.250.47 On Sat, Oct 5, 2013 at 2:45 PM, Aaron Babcock aaron.babc...@gmail.comwrote: Hello, I am using spark through a vpn. My driver machine ends up with two ip addresses, one routable from the cluster and one not. Things generally work when I set the SPARK_LOCAL_IP environment variable to the proper ip address. However, when I try to use the take function ie: myRdd.take(1), I run into a hiccup. From the logfiles on the workers I can see that they trying to connect to the nonroutable ip address, they are not respecting SPARK_LOCAL_IP somehow. Here is the relevant worker log snippet, 192.168.250.47 is the correct routable ip address of the driver, 192.168.0.7 is the incorrect address of the driver. Any thoughts about what else I need to configure? 13/10/05 16:17:36 INFO ConnectionManager: Accepted connection from [192.168.250.47/192.168.250.47] 13/10/05 16:18:41 WARN SendingConnection: Error finishing connection to /192.168.0.7:60513 java.net.ConnectException: Connection timed out at sun.nio.ch.SocketChannelImpl.$$YJP$$checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.checkConnect(SocketChannelImpl.java) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567) at spark.network.SendingConnection.finishConnect(Connection.scala:221) at spark.network.ConnectionManager.spark$network$ConnectionManager$$run(ConnectionManager.scala:127) at spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:70) 13/10/05 16:18:41 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(192.168.0.7,60513) 13/10/05 16:18:41 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(192.168.0.7,60513)