Re: getting different results from same line of code repeated
I'm running into all kinds of problems with Spark 1.5.1 -- does anyone have a version that's working smoothly for them? On Fri, Nov 20, 2015 at 10:50 AM, Dean Wampler <deanwamp...@gmail.com> wrote: > I didn't expect that to fail. I would call it a bug for sure, since it's > practically useless if this method doesn't work. > > Dean Wampler, Ph.D. > Author: Programming Scala, 2nd Edition > <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) > Typesafe <http://typesafe.com> > @deanwampler <http://twitter.com/deanwampler> > http://polyglotprogramming.com > > On Fri, Nov 20, 2015 at 12:45 PM, Walrus theCat <walrusthe...@gmail.com> > wrote: > >> Dean, >> >> What's the point of Scala without magic? :-) >> >> Thanks for your help. It's still giving me unreliable results. There >> just has to be a way to do this in Spark. It's a pretty fundamental thing. >> >> scala> targets.takeOrdered(1) // imported as implicit here >> res23: Array[(String, Int)] = Array() >> >> scala> targets.takeOrdered(1)(CountOrdering) >> res24: Array[(String, Int)] = Array((\bmurders?\b,717)) >> >> scala> targets.takeOrdered(1)(CountOrdering) >> res25: Array[(String, Int)] = Array((\bmurders?\b,717)) >> >> scala> targets.takeOrdered(1)(CountOrdering) >> res26: Array[(String, Int)] = Array((\bguns?\b,1253)) >> >> scala> targets.takeOrdered(1)(CountOrdering) >> res27: Array[(String, Int)] = Array((\bmurders?\b,717)) >> >> >> >> On Wed, Nov 18, 2015 at 6:20 PM, Dean Wampler <deanwamp...@gmail.com> >> wrote: >> >>> You don't have to use sortBy (although that would be better...). You >>> have to define an Ordering object and pass it as the second argument list >>> to takeOrdered()(), or declare it "implicitly". This is more fancy Scala >>> than Spark should require here. Here's an example I've used: >>> >>> // schema with (String,Int). Order by the Int descending >>> object CountOrdering extends Ordering[(String,Int)] { >>> def compare(a:(String,Int), b:(String,Int)) = >>> -(a._2 compare b._2) // - so that it sorts descending >>> } >>> >>> myRDD.takeOrdered(100)(CountOrdering) >>> >>> >>> Or, if you add the keyword "implicit" before "object CountOrdering >>> {...}", then you can omit the second argument list. That's more magic than >>> is justified. ;) >>> >>> HTH, >>> >>> dean >>> >>> >>> Dean Wampler, Ph.D. >>> Author: Programming Scala, 2nd Edition >>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>> Typesafe <http://typesafe.com> >>> @deanwampler <http://twitter.com/deanwampler> >>> http://polyglotprogramming.com >>> >>> On Wed, Nov 18, 2015 at 6:37 PM, Walrus theCat <walrusthe...@gmail.com> >>> wrote: >>> >>>> Dean, >>>> >>>> Thanks a lot. Very helpful. How would I use takeOrdered to order by >>>> the second member of the tuple, as I am attempting to do with >>>> rdd.sortBy(_._2).first? >>>> >>>> On Wed, Nov 18, 2015 at 4:24 PM, Dean Wampler <deanwamp...@gmail.com> >>>> wrote: >>>> >>>>> Someone please correct me if I'm wrong, but I think the answer is >>>>> actually "it's not implemented that way" in the sort methods, and it >>>>> should >>>>> either be documented more explicitly or fixed. >>>>> >>>>> Reading the Spark source code, it looks like each partition is sorted >>>>> internally, and each partition holds a contiguous range of keys in the >>>>> RDD. >>>>> So, if you know which order the partitions should be in, you can produce a >>>>> total order and hence allow take(n) to do what you expect. >>>>> >>>>> The take(n) appears to walk the list of partitions in order, but it's >>>>> that list that's not deterministic. I can't find any evidence that the RDD >>>>> output by sortBy has this list of partitions in the correct order. So, >>>>> each >>>>> time you ran your job, the "targets" RDD had sorted partitions, but the >>>>> list of partitions itself was not properly ordered globally. When you got >>>>> an exception, probably the first partition happened to be empty. >>>>> >&g
getting different results from same line of code repeated
Hi, I'm launching a Spark cluster with the spark-ec2 script and playing around in spark-shell. I'm running the same line of code over and over again, and getting different results, and sometimes exceptions. Towards the end, after I cache the first RDD, it gives me the correct result multiple times in a row before throwing an exception. How can I get correct behavior out of these operations on these RDDs? scala> val targets = data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[116] at sortBy at :36 scala> targets.first res26: (String, Int) = (\bguns?\b,1253) scala> val targets = data map {_.REGEX} groupBy{identity} map { Function.tupled(_->_.size)} sortBy(_._2,false) targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[125] at sortBy at :36 scala> targets.first res27: (String, Int) = (nika,7) scala> val targets = data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[134] at sortBy at :36 scala> targets.first res28: (String, Int) = (\bcalientes?\b,6) scala> targets.sortBy(_._2,false).first java.lang.UnsupportedOperationException: empty collection scala> val targets = data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[283] at sortBy at :36 scala> targets.first res46: (String, Int) = (\bhurting\ yous?\b,8) scala> val targets = data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[292] at sortBy at :36 scala> targets.first java.lang.UnsupportedOperationException: empty collection scala> val targets = data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[301] at sortBy at :36 scala> targets.first res48: (String, Int) = (\bguns?\b,1253) scala> val targets = data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[310] at sortBy at :36 scala> targets.first res49: (String, Int) = (\bguns?\b,1253) scala> val targets = data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[319] at sortBy at :36 scala> targets.first res50: (String, Int) = (\bguns?\b,1253) scala> val targets = data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[328] at sortBy at :36 scala> targets.first java.lang.UnsupportedOperationException: empty collection
Re: send transformed RDD to s3 from slaves
Update: You can now answer this on stackoverflow for 100 bounty: http://stackoverflow.com/questions/33704073/how-to-send-transformed-data-from-partitions-to-s3 On Fri, Nov 13, 2015 at 4:56 PM, Walrus theCat <walrusthe...@gmail.com> wrote: > Hi, > > I have an RDD which crashes the driver when being collected. I want to > send the data on its partitions out to S3 without bringing it back to the > driver. I try calling rdd.foreachPartition, but the data that gets sent has > not gone through the chain of transformations that I need. It's the data > as it was ingested initially. After specifying my chain of > transformations, but before calling foreachPartition, I call rdd.count in > order to force the RDD to transform. The data it sends out is still not > transformed. How do I get the RDD to send out transformed data when > calling foreachPartition? > > Thanks >
send transformed RDD to s3 from slaves
Hi, I have an RDD which crashes the driver when being collected. I want to send the data on its partitions out to S3 without bringing it back to the driver. I try calling rdd.foreachPartition, but the data that gets sent has not gone through the chain of transformations that I need. It's the data as it was ingested initially. After specifying my chain of transformations, but before calling foreachPartition, I call rdd.count in order to force the RDD to transform. The data it sends out is still not transformed. How do I get the RDD to send out transformed data when calling foreachPartition? Thanks
maven doesn't build dependencies with Scala 2.11
Hi, When I run this: dev/change-version-to-2.11.sh mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package as per here https://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211, maven doesn't build Spark's dependencies. Only when I run: dev/change-version-to-2.11.sh sbt/sbt -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package as gathered from here https://github.com/ScrapCodes/spark-1/blob/patch-3/docs/building-spark.md, do I get Spark's dependencies built without any cross-compilation errors. *Question*: - How can I make maven do this? - How can I specify the use of Scala 2.11 in my own .pom files? Thanks
Re: SparkSQL 1.2.0 sources API error
I'm getting this also, with Scala 2.11 and Scala 2.10: 15/01/18 07:34:51 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/01/18 07:34:51 INFO Remoting: Starting remoting 15/01/18 07:34:51 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-7] shutting down ActorSystem [sparkDriver] java.lang.NoSuchMethodError: org.jboss.netty.channel.socket.nio.NioWorkerPool.init(Ljava/util/concurrent/Executor;I)V at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:283) at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:240) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) at scala.util.Try$.apply(Try.scala:161) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at scala.util.Success.flatMap(Try.scala:200) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:692) at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:684) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:684) at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:492) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/01/18 07:34:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/01/18 07:34:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/01/18 07:34:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:180) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1676) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1667) at
Re: using multiple dstreams together (spark streaming)
Thanks! On Wed, Jul 16, 2014 at 6:34 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Have you taken a look at DStream.transformWith( ... ) . That allows you apply arbitrary transformation between RDDs (of the same timestamp) of two different streams. So you can do something like this. 2s-window-stream.transformWith(1s-window-stream, (rdd1: RDD[...], rdd2: RDD[...]) = { ... // return a new RDD }) And streamingContext.transform() extends it to N DStreams. :) Hope this helps! TD On Wed, Jul 16, 2014 at 10:42 AM, Walrus theCat walrusthe...@gmail.com wrote: hey at least it's something (thanks!) ... not sure what i'm going to do if i can't find a solution (other than not use spark) as i really need these capabilities. anyone got anything else? On Wed, Jul 16, 2014 at 10:34 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: hum... maybe consuming all streams at the same time with an actor that would act as a new DStream source... but this is just a random idea... I don't really know if that would be a good idea or even possible. 2014-07-16 18:30 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
using multiple dstreams together (spark streaming)
Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: using multiple dstreams together (spark streaming)
Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: using multiple dstreams together (spark streaming)
Or, if not, is there a way to do this in terms of a single dstream? Keep in mind that dstream1, dstream2, and dstream3 have already had transformations applied. I tried creating the dstreams by calling .window on the first one, but that ends up with me having ... 3 dstreams... which is the same problem. On Wed, Jul 16, 2014 at 10:30 AM, Walrus theCat walrusthe...@gmail.com wrote: Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: using multiple dstreams together (spark streaming)
hey at least it's something (thanks!) ... not sure what i'm going to do if i can't find a solution (other than not use spark) as i really need these capabilities. anyone got anything else? On Wed, Jul 16, 2014 at 10:34 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: hum... maybe consuming all streams at the same time with an actor that would act as a new DStream source... but this is just a random idea... I don't really know if that would be a good idea or even possible. 2014-07-16 18:30 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Yeah -- I tried the .union operation and it didn't work for that reason. Surely there has to be a way to do this, as I imagine this is a commonly desired goal in streaming applications? On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I'm joining several kafka dstreams using the join operation but you have the limitation that the duration of the batch has to be same,i.e. 1 second window for all dstreams... so it would not work for you. 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com: Hi, My application has multiple dstreams on the same inputstream: dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window I want to write logic that deals with all three windows (e.g. when the 1 second window differs from the 2 second window by some delta ...) I've found some examples online (there's not much out there!), and I can only see people transforming a single dstream. In conventional spark, we'd do this sort of thing with a cartesian on RDDs. How can I deal with multiple Dstreams at once? Thanks
Re: truly bizarre behavior with local[n] on Spark 1.0.1
This is (obviously) spark streaming, by the way. On Mon, Jul 14, 2014 at 8:27 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I've got a socketTextStream through which I'm reading input. I have three Dstreams, all of which are the same window operation over that socketTextStream. I have a four core machine. As we've been covering lately, I have to give a cores parameter to my StreamingSparkContext: ssc = new StreamingContext(local[4] /**TODO change once a cluster is up **/, AppName, Seconds(1)) Now, I have three dstreams, and all I ask them to do is print or count. I should preface this with the statement that they all work on their own. dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window If I construct the ssc with local[8], and put these statements in this order, I get prints on the first one, and zero counts on the second one: ssc(local[8]) // hyperthread dat sheezy dstream1.print // works dstream2.count.print // always prints 0 If I do this, this happens: ssc(local[4]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // doesn't work, prints 0 ssc(local[6]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // works, prints 1 Sometimes these results switch up, seemingly at random. How can I get things to the point where I can develop and test my application locally? Thanks
Re: truly bizarre behavior with local[n] on Spark 1.0.1
Will do. On Tue, Jul 15, 2014 at 12:56 PM, Tathagata Das tathagata.das1...@gmail.com wrote: This sounds really really weird. Can you give me a piece of code that I can run to reproduce this issue myself? TD On Tue, Jul 15, 2014 at 12:02 AM, Walrus theCat walrusthe...@gmail.com wrote: This is (obviously) spark streaming, by the way. On Mon, Jul 14, 2014 at 8:27 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I've got a socketTextStream through which I'm reading input. I have three Dstreams, all of which are the same window operation over that socketTextStream. I have a four core machine. As we've been covering lately, I have to give a cores parameter to my StreamingSparkContext: ssc = new StreamingContext(local[4] /**TODO change once a cluster is up **/, AppName, Seconds(1)) Now, I have three dstreams, and all I ask them to do is print or count. I should preface this with the statement that they all work on their own. dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window If I construct the ssc with local[8], and put these statements in this order, I get prints on the first one, and zero counts on the second one: ssc(local[8]) // hyperthread dat sheezy dstream1.print // works dstream2.count.print // always prints 0 If I do this, this happens: ssc(local[4]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // doesn't work, prints 0 ssc(local[6]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // works, prints 1 Sometimes these results switch up, seemingly at random. How can I get things to the point where I can develop and test my application locally? Thanks
truly bizarre behavior with local[n] on Spark 1.0.1
Hi, I've got a socketTextStream through which I'm reading input. I have three Dstreams, all of which are the same window operation over that socketTextStream. I have a four core machine. As we've been covering lately, I have to give a cores parameter to my StreamingSparkContext: ssc = new StreamingContext(local[4] /**TODO change once a cluster is up **/, AppName, Seconds(1)) Now, I have three dstreams, and all I ask them to do is print or count. I should preface this with the statement that they all work on their own. dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window If I construct the ssc with local[8], and put these statements in this order, I get prints on the first one, and zero counts on the second one: ssc(local[8]) // hyperthread dat sheezy dstream1.print // works dstream2.count.print // always prints 0 If I do this, this happens: ssc(local[4]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // doesn't work, prints 0 ssc(local[6]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // works, prints 1 Sometimes these results switch up, seemingly at random. How can I get things to the point where I can develop and test my application locally? Thanks
can't print DStream after reduce
Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: can't print DStream after reduce
Update on this: val lines = ssc.socketTextStream(localhost, ) lines.print // works lines.map(_-1).print // works lines.map(_-1).reduceByKey(_+_).print // nothing printed to driver console Just lots of: 14/07/13 11:37:40 INFO receiver.BlockGenerator: Pushed block input-0-1405276660400 14/07/13 11:37:41 INFO scheduler.ReceiverTracker: Stream 0 received 1 blocks 14/07/13 11:37:41 INFO scheduler.JobScheduler: Added jobs for time 1405276661000 ms 14/07/13 11:37:41 INFO storage.MemoryStore: ensureFreeSpace(60) called with curMem=1275, maxMem=98539929 14/07/13 11:37:41 INFO storage.MemoryStore: Block input-0-1405276661400 stored as bytes to memory (size 60.0 B, free 94.0 MB) 14/07/13 11:37:41 INFO storage.BlockManagerInfo: Added input-0-1405276661400 in memory on 25.17.218.118:55820 (size: 60.0 B, free: 94.0 MB) 14/07/13 11:37:41 INFO storage.BlockManagerMaster: Updated info of block input-0-1405276661400 Any insight? Thanks On Sun, Jul 13, 2014 at 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: can't print DStream after reduce
Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: can't print DStream after reduce
More strange behavior: lines.foreachRDD(x = println(x.first)) // works lines.foreachRDD(x = println((x.count,x.first))) // no output is printed to driver console On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat walrusthe...@gmail.com wrote: Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: can't print DStream after reduce
Great success! I was able to get output to the driver console by changing the construction of the Streaming Spark Context from: val ssc = new StreamingContext(local /**TODO change once a cluster is up **/, AppName, Seconds(1)) to: val ssc = new StreamingContext(local[2] /**TODO change once a cluster is up **/, AppName, Seconds(1)) I found something that tipped me off that this might work by digging through this mailing list. On Sun, Jul 13, 2014 at 2:00 PM, Walrus theCat walrusthe...@gmail.com wrote: More strange behavior: lines.foreachRDD(x = println(x.first)) // works lines.foreachRDD(x = println((x.count,x.first))) // no output is printed to driver console On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat walrusthe...@gmail.com wrote: Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: not getting output from socket connection
Hah, thanks for tidying up the paper trail here, but I was the OP (and solver) of the recent reduce thread that ended in this solution. On Sun, Jul 13, 2014 at 4:26 PM, Michael Campbell michael.campb...@gmail.com wrote: Make sure you use local[n] (where n 1) in your context setup too, (if you're running locally), or you won't get output. On Sat, Jul 12, 2014 at 11:36 PM, Walrus theCat walrusthe...@gmail.com wrote: Thanks! I thought it would get passed through netcat, but given your email, I was able to follow this tutorial and get it to work: http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html On Fri, Jul 11, 2014 at 1:31 PM, Sean Owen so...@cloudera.com wrote: netcat is listening for a connection on port . It is echoing what you type to its console to anything that connects to and reads. That is what Spark streaming does. If you yourself connect to and write, nothing happens except that netcat echoes it. This does not cause Spark to somehow get that data. nc is only echoing input from the console. On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
Re: not getting output from socket connection
Thanks! I thought it would get passed through netcat, but given your email, I was able to follow this tutorial and get it to work: http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html On Fri, Jul 11, 2014 at 1:31 PM, Sean Owen so...@cloudera.com wrote: netcat is listening for a connection on port . It is echoing what you type to its console to anything that connects to and reads. That is what Spark streaming does. If you yourself connect to and write, nothing happens except that netcat echoes it. This does not cause Spark to somehow get that data. nc is only echoing input from the console. On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
not getting output from socket connection
Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
Re: not getting output from socket connection
I forgot to add that I get the same behavior if I tail -f | nc localhost on a log file. On Fri, Jul 11, 2014 at 1:25 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
Re: How can adding a random count() change the behavior of my program?
Nick, I have encountered strange things like this before (usually when programming with mutable structures and side-effects), and for me, the answer was that, until .count (or .first, or similar), is called, your variable 'a' refers to a set of instructions that only get executed to form the object you expect when you're asking something of it. Back before I was using side-effect-free techniques on immutable data structures, I had to call .first or .count or similar to get the behavior I wanted. There are still special cases where I have to purposefully collapse the RDD for some reason or another. This may not be new information to you, but I've encountered similar behavior before and highly suspect this is playing a role here. On Mon, May 5, 2014 at 5:52 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I’m running into something very strange today. I’m getting an error on the follow innocuous operations. a = sc.textFile('s3n://...') a = a.repartition(8) a = a.map(...) c = a.countByKey() # ERRORs out on this action. See below for traceback. [1] If I add a count() right after the repartition(), this error magically goes away. a = sc.textFile('s3n://...') a = a.repartition(8) print a.count() a = a.map(...) c = a.countByKey() # A-OK! WTF? To top it off, this “fix” is inconsistent. Sometimes, I still get this error. This is strange. How do I get to the bottom of this? Nick [1] Here’s the traceback: Traceback (most recent call last): File stdin, line 7, in module File file.py, line 187, in function_blah c = a.countByKey() File /root/spark/python/pyspark/rdd.py, line 778, in countByKey return self.map(lambda x: x[0]).countByValue() File /root/spark/python/pyspark/rdd.py, line 624, in countByValue return self.mapPartitions(countPartition).reduce(mergeMaps) File /root/spark/python/pyspark/rdd.py, line 505, in reduce vals = self.mapPartitions(func).collect() File /root/spark/python/pyspark/rdd.py, line 469, in collect bytesInJava = self._jrdd.collect().iterator() File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o46.collect. -- View this message in context: How can adding a random count() change the behavior of my program?http://apache-spark-user-list.1001560.n3.nabble.com/How-can-adding-a-random-count-change-the-behavior-of-my-program-tp5406.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: can't sc.paralellize in Spark 0.7.3 spark-shell
Actually altering the classpath in the REPL causes the provided SparkContext to disappear: scala sc.parallelize(List(1,2,3)) res0: spark.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:13 scala :cp /root Added '/root'. Your new classpath is: :/root/jars/aspectjrt.jar:/root/jars/aspectjweaver.jar:/root/jars/aws-java-sdk-1.4.5.jar:/root/jars/aws-java-sdk-1.4.5-javadoc.jar:/root/jars/aws-java-sdk-1.4.5-sources.jar:/root/jars/aws-java-sdk-flow-build-tools-1.4.5.jar:/root/jars/commons-codec-1.3.jar:/root/jars/commons-logging-1.1.1.jar:/root/jars/freemarker-2.3.18.jar:/root/jars/httpclient-4.1.1.jar:/root/jars/httpcore-4.1.jar:/root/jars/jackson-core-asl-1.8.7.jar:/root/jars/mail-1.4.3.jar:/root/jars/spring-beans-3.0.7.jar:/root/jars/spring-context-3.0.7.jar:/root/jars/spring-core-3.0.7.jar:/root/jars/stax-1.2.0.jar:/root/jars/stax-api-1.0.1.jar:/root/spark/conf:/root/spark/core/target/scala-2.9.3/classes:/root/spark/core/src/main/resources:/root/spark/repl/target/scala-2.9.3/classes:/root/spark/examples/target/scala-2.9.3/classes:/root/spark/streaming/target/scala-2.9.3/classes:/root/spark/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/*:/root/spark/lib_managed/jars/*:/root/spark/lib_managed/bundles/*:/root/spark/repl/lib/*:/root/spark/bagel/target/scala-2.9.3/classes:/root/spark/python/lib/py4j0.7.jar:/root 14/04/15 18:19:37 INFO server.Server: jetty-7.6.8.v20121106 14/04/15 18:19:37 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48978 Replaying: sc.parallelize(List(1,2,3)) console:8: error: not found: value sc sc.parallelize(List(1,2,3)) On Mon, Apr 14, 2014 at 7:51 PM, Walrus theCat walrusthe...@gmail.comwrote: Nevermind -- I'm like 90% sure the problem is that I'm importing stuff that declares a SparkContext as sc. If it's not, I'll report back. On Mon, Apr 14, 2014 at 2:55 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Using the spark-shell, I can't sc.parallelize to get an RDD. Looks like a bug. scala sc.parallelize(Array(a,s,d)) java.lang.NullPointerException at init(console:17) at init(console:22) at init(console:24) at init(console:26) at init(console:28) at init(console:30) at init(console:32) at init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:11) at .clinit(console) at $export(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629) at spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890) at scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43) at scala.tools.nsc.io.package$$anon$2.run(package.scala:25) at java.lang.Thread.run(Thread.java:744)
Re: can't sc.paralellize in Spark 0.7.3 spark-shell
Dankeschön ! On Tue, Apr 15, 2014 at 11:29 AM, Aaron Davidson ilike...@gmail.com wrote: This is probably related to the Scala bug that :cp does not work: https://issues.scala-lang.org/browse/SI-6502 On Tue, Apr 15, 2014 at 11:21 AM, Walrus theCat walrusthe...@gmail.comwrote: Actually altering the classpath in the REPL causes the provided SparkContext to disappear: scala sc.parallelize(List(1,2,3)) res0: spark.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:13 scala :cp /root Added '/root'. Your new classpath is: :/root/jars/aspectjrt.jar:/root/jars/aspectjweaver.jar:/root/jars/aws-java-sdk-1.4.5.jar:/root/jars/aws-java-sdk-1.4.5-javadoc.jar:/root/jars/aws-java-sdk-1.4.5-sources.jar:/root/jars/aws-java-sdk-flow-build-tools-1.4.5.jar:/root/jars/commons-codec-1.3.jar:/root/jars/commons-logging-1.1.1.jar:/root/jars/freemarker-2.3.18.jar:/root/jars/httpclient-4.1.1.jar:/root/jars/httpcore-4.1.jar:/root/jars/jackson-core-asl-1.8.7.jar:/root/jars/mail-1.4.3.jar:/root/jars/spring-beans-3.0.7.jar:/root/jars/spring-context-3.0.7.jar:/root/jars/spring-core-3.0.7.jar:/root/jars/stax-1.2.0.jar:/root/jars/stax-api-1.0.1.jar:/root/spark/conf:/root/spark/core/target/scala-2.9.3/classes:/root/spark/core/src/main/resources:/root/spark/repl/target/scala-2.9.3/classes:/root/spark/examples/target/scala-2.9.3/classes:/root/spark/streaming/target/scala-2.9.3/classes:/root/spark/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/*:/root/spark/lib_managed/jars/*:/root/spark/lib_managed/bundles/*:/root/spark/repl/lib/*:/root/spark/bagel/target/scala-2.9.3/classes:/root/spark/python/lib/py4j0.7.jar:/root 14/04/15 18:19:37 INFO server.Server: jetty-7.6.8.v20121106 14/04/15 18:19:37 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48978 Replaying: sc.parallelize(List(1,2,3)) console:8: error: not found: value sc sc.parallelize(List(1,2,3)) On Mon, Apr 14, 2014 at 7:51 PM, Walrus theCat walrusthe...@gmail.comwrote: Nevermind -- I'm like 90% sure the problem is that I'm importing stuff that declares a SparkContext as sc. If it's not, I'll report back. On Mon, Apr 14, 2014 at 2:55 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Using the spark-shell, I can't sc.parallelize to get an RDD. Looks like a bug. scala sc.parallelize(Array(a,s,d)) java.lang.NullPointerException at init(console:17) at init(console:22) at init(console:24) at init(console:26) at init(console:28) at init(console:30) at init(console:32) at init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:11) at .clinit(console) at $export(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629) at spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890) at scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43) at scala.tools.nsc.io.package$$anon$2.run(package.scala:25) at java.lang.Thread.run(Thread.java:744)
can't sc.paralellize in Spark 0.7.3 spark-shell
Hi, Using the spark-shell, I can't sc.parallelize to get an RDD. Looks like a bug. scala sc.parallelize(Array(a,s,d)) java.lang.NullPointerException at init(console:17) at init(console:22) at init(console:24) at init(console:26) at init(console:28) at init(console:30) at init(console:32) at init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:11) at .clinit(console) at $export(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629) at spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890) at scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43) at scala.tools.nsc.io.package$$anon$2.run(package.scala:25) at java.lang.Thread.run(Thread.java:744)
Re: can't sc.paralellize in Spark 0.7.3 spark-shell
Nevermind -- I'm like 90% sure the problem is that I'm importing stuff that declares a SparkContext as sc. If it's not, I'll report back. On Mon, Apr 14, 2014 at 2:55 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Using the spark-shell, I can't sc.parallelize to get an RDD. Looks like a bug. scala sc.parallelize(Array(a,s,d)) java.lang.NullPointerException at init(console:17) at init(console:22) at init(console:24) at init(console:26) at init(console:28) at init(console:30) at init(console:32) at init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:11) at .clinit(console) at $export(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629) at spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:890) at scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43) at scala.tools.nsc.io.package$$anon$2.run(package.scala:25) at java.lang.Thread.run(Thread.java:744)
interleave partitions?
Hi, I want to do something like this: rdd3 = rdd1.coalesce(N).partitions.zip(rdd2.coalesce(N).partitions) I realize the above will get me something like Array[(partition,partition)]. I hope you see what I'm going for here -- any tips on how to accomplish this? Thanks
Re: interleave partitions?
Answering my own question here. This may not be efficient, but this is what I came up with: rdd1.coalesce(N).glom.zip(rdd2.coalesce(N).glom).map { case(x,y) = x++y} On Wed, Mar 26, 2014 at 11:11 AM, Walrus theCat walrusthe...@gmail.comwrote: Hi, I want to do something like this: rdd3 = rdd1.coalesce(N).partitions.zip(rdd2.coalesce(N).partitions) I realize the above will get me something like Array[(partition,partition)]. I hope you see what I'm going for here -- any tips on how to accomplish this? Thanks
Re: coalescing RDD into equally sized partitions
For the record, I tried this, and it worked. On Wed, Mar 26, 2014 at 10:51 AM, Walrus theCat walrusthe...@gmail.comwrote: Oh so if I had something more reasonable, like RDD's full of tuples of say, (Int,Set,Set), I could expect a more uniform distribution? Thanks On Mon, Mar 24, 2014 at 11:11 PM, Matei Zaharia matei.zaha...@gmail.comwrote: This happened because they were integers equal to 0 mod 5, and we used the default hashCode implementation for integers, which will map them all to 0. There's no API method that will look at the resulting partition sizes and rebalance them, but you could use another hash function. Matei On Mar 24, 2014, at 5:20 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, sc.parallelize(Array.tabulate(100)(i=i)).filter( _ % 20 == 0 ).coalesce(5,true).glom.collect yields Array[Array[Int]] = Array(Array(0, 20, 40, 60, 80), Array(), Array(), Array(), Array()) How do I get something more like: Array(Array(0), Array(20), Array(40), Array(60), Array(80)) Thanks
question about partitions
Hi, Quick question about partitions. If my RDD is partitioned into 5 partitions, does that mean that I am constraining it to exist on at most 5 machines? Thanks
Re: question about partitions
For instance, I need to work with an RDD in terms of N parts. Will calling RDD.coalesce(N) possibly cause processing bottlenecks? On Mon, Mar 24, 2014 at 1:28 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Quick question about partitions. If my RDD is partitioned into 5 partitions, does that mean that I am constraining it to exist on at most 5 machines? Thanks
Re: question about partitions
Syed, Thanks for the tip. I'm not sure if coalesce is doing what I'm intending to do, which is, in effect, to subdivide the RDD into N parts (by calling coalesce and doing operations on the partitions.) It sounds like, however, this won't bottleneck my processing power. If this sets off any alarms for anyone, feel free to chime in. On Mon, Mar 24, 2014 at 2:50 PM, Syed A. Hashmi shas...@cloudera.comwrote: RDD.coalesce should be fine for rebalancing data across all RDD partitions. Coalesce is pretty handy in situations where you have sparse data and want to compact it (e.g. data after applying a strict filter) OR you know the magic number of partitions according to your cluster which will be optimal. One point to watch out though is that if N is greater than your current partitions, you need to pass shuffle=true to coalesce. If N is less than your current partitions (i.e. you are shrinking partitions, do not set shuffle=true, otherwise it will cause additional unnecessary shuffle overhead. On Mon, Mar 24, 2014 at 2:32 PM, Walrus theCat walrusthe...@gmail.comwrote: For instance, I need to work with an RDD in terms of N parts. Will calling RDD.coalesce(N) possibly cause processing bottlenecks? On Mon, Mar 24, 2014 at 1:28 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Quick question about partitions. If my RDD is partitioned into 5 partitions, does that mean that I am constraining it to exist on at most 5 machines? Thanks
coalescing RDD into equally sized partitions
Hi, sc.parallelize(Array.tabulate(100)(i=i)).filter( _ % 20 == 0 ).coalesce(5,true).glom.collect yields Array[Array[Int]] = Array(Array(0, 20, 40, 60, 80), Array(), Array(), Array(), Array()) How do I get something more like: Array(Array(0), Array(20), Array(40), Array(60), Array(80)) Thanks
Re: inexplicable exceptions in Spark 0.7.3
Hi Andrew, Thanks for your interest. This is a standalone job. On Mon, Mar 17, 2014 at 4:30 PM, Andrew Ash and...@andrewash.com wrote: Are you running from the spark shell or from a standalone job? On Mon, Mar 17, 2014 at 4:17 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, I'm getting this stack trace, using Spark 0.7.3. No references to anything in my code, never experienced anything like this before. Any ideas what is going on? java.lang.ClassCastException: spark.SparkContext$$anonfun$9 cannot be cast to scala.Function2 at spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:43) at spark.scheduler.ResultTask.readExternal(ResultTask.scala:106) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at spark.JavaDeserializationStream.readObject(JavaSerializer.scala:23) at spark.JavaSerializerInstance.deserialize(JavaSerializer.scala:45) at spark.executor.Executor$TaskRunner.run(Executor.scala:96) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
inexplicable exceptions in Spark 0.7.3
Hi, I'm getting this stack trace, using Spark 0.7.3. No references to anything in my code, never experienced anything like this before. Any ideas what is going on? java.lang.ClassCastException: spark.SparkContext$$anonfun$9 cannot be cast to scala.Function2 at spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:43) at spark.scheduler.ResultTask.readExternal(ResultTask.scala:106) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at spark.JavaDeserializationStream.readObject(JavaSerializer.scala:23) at spark.JavaSerializerInstance.deserialize(JavaSerializer.scala:45) at spark.executor.Executor$TaskRunner.run(Executor.scala:96) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)