[incubating-0.9.0] Too Many Open Files on Workers
Hey guyz, I've got this issue (see bottom) with Spark, deployed in Standalone mode on a local docker environment. I know that I need to raise the ulimit (only 1024 now) but in the meantime I was just wondering how this could happen. My gut feeling is because I'm mounting a lot in memory and Spark tries to dump some RDDs on the FS, and then boom. Also, I was wondering if it cannot be a clue that my job is maybe to eager in memory? How is it something quite normal which such a low ulimit on workers? Thanks a lot (in advance ^^) Cheers, andy 14/02/21 08:32:15 ERROR Executor: Exception in task ID 472 org.jboss.netty.channel.ChannelException: Failed to create a selector. at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:337) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.init(AbstractNioSelector.java:95) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.init(AbstractNioWorker.java:53) at org.jboss.netty.channel.socket.nio.NioWorker.init(NioWorker.java:45) at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45) at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28) at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:99) at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:69) at org.jboss.netty.channel.socket.nio.NioWorkerPool.init(NioWorkerPool.java:39) at org.jboss.netty.channel.socket.nio.NioWorkerPool.init(NioWorkerPool.java:33) at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.init(NioClientSocketChannelFactory.java:151) at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.init(NioClientSocketChannelFactory.java:116) at com.datastax.driver.core.Connection$Factory.init(Connection.java:349) at com.datastax.driver.core.Connection$Factory.init(Connection.java:360) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:857) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:806) at com.datastax.driver.core.Cluster.init(Cluster.java:76) at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:132) at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:771) at com.virdata.core.batch.sample.Timeseries$$anonfun$storeInCassandra$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(Timeseries.scala:45) at com.virdata.core.batch.sample.Timeseries$$anonfun$storeInCassandra$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(Timeseries.scala:38) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:595) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:595) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: java.io.IOException: Too many open files at sun.nio.ch.IOUtil.makePipe(Native Method) at sun.nio.ch.EPollSelectorImpl.init(EPollSelectorImpl.java:65) at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) at java.nio.channels.Selector.open(Selector.java:227) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:335) ... 37 more 14/02/21 08:32:53 WARN BlockManagerMaster: Error sending message to BlockManagerMaster in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:162) at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52) at org.apache.spark.storage.BlockManager.org
Re: [incubating-0.9.0] Too Many Open Files on Workers
MMMmmmh good point !! Before answering, I tried to use callioppe but I got an issue and since the iteration review was near I quickly switched to the datastax driver. But I'll get to callioppe soon, with some questions maybe ;-). Regarding your point (very good one, I've to say), actually I'm creating a session and a batch per partitions. Now the shamy part... I haven't set any options for the pool :-/. Is there some tuning clues? In my case the C* is local (docker image) so maybe should i do builder.poolingOptions().setMaxConnectionsPerHost(LOCAL, BIGNUMBER)? The point is, what about this BIGNUMBER... can it be really big? (Sounds weird to me, but I don't want to pre-filter options based on feelings). Thanks for your response andy On Fri, Feb 21, 2014 at 10:36 AM, Sourav Chandra sourav.chan...@livestream.com wrote: From stacktrace it looks like you are using datstax cassandra driver and it tried to create cluster. How many connections you are creating in poolingOptions() i.e. builder. poolingOptions().setMaxConnectionsPerHost(...) Are you creating this per rdd? Might be there are lots of connections created and at last it failed to create any more. Thanks, Sourav On Fri, Feb 21, 2014 at 3:02 PM, andy petrella andy.petre...@gmail.comwrote: Hey guyz, I've got this issue (see bottom) with Spark, deployed in Standalone mode on a local docker environment. I know that I need to raise the ulimit (only 1024 now) but in the meantime I was just wondering how this could happen. My gut feeling is because I'm mounting a lot in memory and Spark tries to dump some RDDs on the FS, and then boom. Also, I was wondering if it cannot be a clue that my job is maybe to eager in memory? How is it something quite normal which such a low ulimit on workers? Thanks a lot (in advance ^^) Cheers, andy 14/02/21 08:32:15 ERROR Executor: Exception in task ID 472 org.jboss.netty.channel.ChannelException: Failed to create a selector. at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:337) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.init(AbstractNioSelector.java:95) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.init(AbstractNioWorker.java:53) at org.jboss.netty.channel.socket.nio.NioWorker.init(NioWorker.java:45) at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45) at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28) at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:99) at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:69) at org.jboss.netty.channel.socket.nio.NioWorkerPool.init(NioWorkerPool.java:39) at org.jboss.netty.channel.socket.nio.NioWorkerPool.init(NioWorkerPool.java:33) at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.init(NioClientSocketChannelFactory.java:151) at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.init(NioClientSocketChannelFactory.java:116) at com.datastax.driver.core.Connection$Factory.init(Connection.java:349) at com.datastax.driver.core.Connection$Factory.init(Connection.java:360) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:857) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:806) at com.datastax.driver.core.Cluster.init(Cluster.java:76) at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:132) at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:771) at com.virdata.core.batch.sample.Timeseries$$anonfun$storeInCassandra$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(Timeseries.scala:45) at com.virdata.core.batch.sample.Timeseries$$anonfun$storeInCassandra$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(Timeseries.scala:38) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:595) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:595) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178
Re: DataFrame RDDs
indeed the scala version could be blocking (I'm not sure what it needs 2.11, maybe Miles uses quasiquotes...) Andy On Tue, Nov 19, 2013 at 8:48 AM, Anwar Rizal anriza...@gmail.com wrote: I had that in mind too when Miles Sabin presented Shapeless at Scala.IO Paris last month. If anybody would like to experiment with shapeless in Spark to create something like R data frame or In canter dataset, I would be happy to see and eventually help. My feeling is however the fact that shapeless goes fast (eg. in my understanding, the latest shapeless requires 2.11) may be a problem. On Nov 19, 2013 12:46 AM, andy petrella andy.petre...@gmail.com wrote: Maybe I'm wrong, but this use case could be a good fit for Shapelesshttps://github.com/milessabin/shapeless' records. Shapeless' records are like, so to say, lisp's record but typed! In that sense, they're more closer to Haskell's record notation, but imho less powerful, since the access will be based on String (field name) for Shapeless where Haskell will use pure functions! Anyway, this documentationhttps://github.com/milessabin/shapeless/wiki/Feature-overview%3a-shapeless-2.0.0#extensible-records is self-explanatory and straightforward how we (maybe) could use them to simulate an R's frame Thinking out loud: when reading a csv file, for instance, what would be needed are * a Read[T] for each column, * fold'ling the list of columns by reading each and prepending the result (combined with the name with -) to an HList The gain would be that we should recover one helpful feature of R's frame which is: R :: frame$newCol = frame$post - frame$pre // which adds a column to a frame Shpls :: frame2 = frame + (newCol -- (frame(post) - frame(pre))) // type safe difference between ints for instance Of course, we're not recovering R's frame as is, because we're simply dealing with rows on by one, where a frame is dealing with the full table -- but in the case of Spark this would have no sense to mimic that, since we use RDDs for that :-D. I didn't experimented this yet, but It'd be fun to try, don't know if someone is interested in ^^ Cheers andy On Fri, Nov 15, 2013 at 8:49 PM, Christopher Nguyen c...@adatao.comwrote: Sure, Shay. Let's connect offline. Sent while mobile. Pls excuse typos etc. On Nov 16, 2013 2:27 AM, Shay Seng s...@1618labs.com wrote: Nice, any possibility of sharing this code in advance? On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen c...@adatao.comwrote: Shay, we've done this at Adatao, specifically a big data frame in RDD representation and subsetting/projections/data mining/machine learning algorithms on that in-memory table structure. We're planning to harmonize that with the MLBase work in the near future. Just a matter of prioritization on limited resources. If there's enough interest we'll accelerate that. Sent while mobile. Pls excuse typos etc. On Nov 16, 2013 1:11 AM, Shay Seng s...@1618labs.com wrote: Hi, Is there some way to get R-style Data.Frame data structures into RDDs? I've been using RDD[Seq[]] but this is getting quite error-prone and the code gets pretty hard to read especially after a few joins, maps etc. Rather than access columns by index, I would prefer to access them by name. e.g. instead of writing: myrdd.map(l = Seq(l(0), l(1), l,(4), l(9)) I would prefer to write myrdd.map(l = DataFrame(l.id, l.entryTime, l.exitTime, l.cost)) Also joins are particularly irritating. Currently I have to first construct a pair: somePairRdd.join(myrdd.map(l= (l(1),l(2)), (l(0),l(1),l(2),l(3))) Now I have to unzip away the join-key and remap the values into a seq instead I would rather write someDataFrame.join(myrdd , l= l.entryTime l.exitTime) The question is this: (1) I started writing a DataFrameRDD class that kept track of the column names and column values, and some optional attributes common to the entire dataframe. However I got a little muddled when trying to figure out what happens when a dataframRDD is chained with other operations and get transformed to other types of RDDs. The Value part of the RDD is obvious, but I didn't know the best way to pass on the column and attribute portions of the DataFrame class. I googled around for some documentation on how to write RDDs, but only found a pptx slide presentation with very vague info. Is there a better source of info on how to write RDDs? (2) Even better than info on how to write RDDs, has anyone written an RDD that functions as a DataFrame? :-) tks shay
Re: DataFrame RDDs
Maybe I'm wrong, but this use case could be a good fit for Shapelesshttps://github.com/milessabin/shapeless' records. Shapeless' records are like, so to say, lisp's record but typed! In that sense, they're more closer to Haskell's record notation, but imho less powerful, since the access will be based on String (field name) for Shapeless where Haskell will use pure functions! Anyway, this documentationhttps://github.com/milessabin/shapeless/wiki/Feature-overview%3a-shapeless-2.0.0#extensible-records is self-explanatory and straightforward how we (maybe) could use them to simulate an R's frame Thinking out loud: when reading a csv file, for instance, what would be needed are * a Read[T] for each column, * fold'ling the list of columns by reading each and prepending the result (combined with the name with -) to an HList The gain would be that we should recover one helpful feature of R's frame which is: R :: frame$newCol = frame$post - frame$pre // which adds a column to a frame Shpls :: frame2 = frame + (newCol -- (frame(post) - frame(pre))) // type safe difference between ints for instance Of course, we're not recovering R's frame as is, because we're simply dealing with rows on by one, where a frame is dealing with the full table -- but in the case of Spark this would have no sense to mimic that, since we use RDDs for that :-D. I didn't experimented this yet, but It'd be fun to try, don't know if someone is interested in ^^ Cheers andy On Fri, Nov 15, 2013 at 8:49 PM, Christopher Nguyen c...@adatao.com wrote: Sure, Shay. Let's connect offline. Sent while mobile. Pls excuse typos etc. On Nov 16, 2013 2:27 AM, Shay Seng s...@1618labs.com wrote: Nice, any possibility of sharing this code in advance? On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen c...@adatao.comwrote: Shay, we've done this at Adatao, specifically a big data frame in RDD representation and subsetting/projections/data mining/machine learning algorithms on that in-memory table structure. We're planning to harmonize that with the MLBase work in the near future. Just a matter of prioritization on limited resources. If there's enough interest we'll accelerate that. Sent while mobile. Pls excuse typos etc. On Nov 16, 2013 1:11 AM, Shay Seng s...@1618labs.com wrote: Hi, Is there some way to get R-style Data.Frame data structures into RDDs? I've been using RDD[Seq[]] but this is getting quite error-prone and the code gets pretty hard to read especially after a few joins, maps etc. Rather than access columns by index, I would prefer to access them by name. e.g. instead of writing: myrdd.map(l = Seq(l(0), l(1), l,(4), l(9)) I would prefer to write myrdd.map(l = DataFrame(l.id, l.entryTime, l.exitTime, l.cost)) Also joins are particularly irritating. Currently I have to first construct a pair: somePairRdd.join(myrdd.map(l= (l(1),l(2)), (l(0),l(1),l(2),l(3))) Now I have to unzip away the join-key and remap the values into a seq instead I would rather write someDataFrame.join(myrdd , l= l.entryTime l.exitTime) The question is this: (1) I started writing a DataFrameRDD class that kept track of the column names and column values, and some optional attributes common to the entire dataframe. However I got a little muddled when trying to figure out what happens when a dataframRDD is chained with other operations and get transformed to other types of RDDs. The Value part of the RDD is obvious, but I didn't know the best way to pass on the column and attribute portions of the DataFrame class. I googled around for some documentation on how to write RDDs, but only found a pptx slide presentation with very vague info. Is there a better source of info on how to write RDDs? (2) Even better than info on how to write RDDs, has anyone written an RDD that functions as a DataFrame? :-) tks shay
Re: Spark and geospatial data
Hello Rob, As you may know I have a long experience in Geospatial data, and I'm now investigating Spark... So I'll be very interested further answers but also to participate to going forward on this great idea! For instance, I'd say that implementing classical geospatial algorithms like classification, feature extraction, pyramid generation and so on would be a geo-extension lib to Spark, this would be easier using Geotrellis API. My only question, for now, is that Geotrellis has his own notion of lineage and Spark as well, so maybe some harmonization work will have to be done to serialize and schedule them? Maybe Pickles could help for the serialization part... Sorry If I miss something (or even said stupidities ^^)... I'm going now to the thread you mentioned! Looking forward ;) Cheers andy On Thu, Nov 7, 2013 at 8:49 PM, Rob Emanuele lossy...@gmail.com wrote: Hello, I'm a developer on the GeoTrellis project (http://geotrellis.github.io). We do fast raster processing over large data sets, from web-time (sub-100ms) processing for live endpoints to distributed raster analysis over clusters using Akka clustering. There's currently discussion underway about moving to support a Spark backend for doing large scale distributed raster analysis. You can see the discussion here: https://groups.google.com/forum/#!topic/geotrellis-user/wkUOhFwYAvc. Any contributions to the discussion would be welcome. My question to the list is, is there currently any development towards a geospatial data story for Spark, that is, using Spark for large scale raster\vector spatial data analysis? Is there anyone using Spark currently for this sort of work? Thanks, Rob Emanuele
Re: Spark Streaming Server
Hello there, I hope I get the whole thing correctly... but I think I did something quite similar that polls the yahoo api for data, which I push into a DStream: https://github.com/andypetrella/spark-bd/blob/master/src/main/scala/yahoo.scala#L41 Any comments or even concerns (if they are constructive ^^) are more than welcome. And, btw, I was thinking the same, this kind of use cases should be enabled easier in the API. Cheers andy On Wed, Sep 18, 2013 at 9:42 PM, Vadim Chekan kot.bege...@gmail.com wrote: Hi Paul, I had the same thoughts when started looking at spark. This link might be helpful: https://github.com/mesos/spark/blob/master/docs/streaming-custom-receivers.md Vadim. On Tue, Sep 17, 2013 at 6:50 PM, Paul Snively psniv...@icloud.com wrote: Hi everyone! In looking at the Spark Streaming API more closely, it occurs to me that it seems biased toward seeing Spark Streaming processes as consumers of existing services. I'm thinking particularly of socketStream and zeroMQ stream, which want a host/port and Publisher, respectively. But I think this overlooks a whole class of use cases, where I'd like to establish a server that someone can stream data to, and use Spark Streaming to process it. Has any work been done on this front? I suppose, worst case, I could write my own NetworkReceiver that listens on a socket or somesuch, but it'd be nice if I didn't have to. Thoughts? Thanks! Paul -- From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified