20 nodes. As part of an iterative algorithm I need to do a join. One dataset is 100g while the other is 10g. I was hoping to speed it up with a broadcast join (in pig, I joined these two datasets using their "replicated join" feature, it made a big difference).
Since the usual RDD.join seems to be working I'll stop messing around with broadcasts. On Sat, Oct 5, 2013 at 5:22 PM, Stoney Vintson <[email protected]> wrote: > Ultimately, what do you want to do with the 10GBs of results? Do you want > to write it back to hdfs, hbase, etc. Are you going to store it in Tachyon > and query it from shark? Why make a distributed IO write travel across the > slower network and make it non distributed? > > On Oct 5, 2013 5:06 PM, "Ryan Compton" <[email protected]> wrote: >> >> I have 128g for each node >> >> On Sat, Oct 5, 2013 at 4:58 PM, Reynold Xin <[email protected]> wrote: >> > You probably shouldn't be collecting a 10g dataset, because that is >> > going to >> > put all the 10g to the driver node ... >> > >> > >> > On Fri, Oct 4, 2013 at 6:53 PM, Ryan Compton <[email protected]> >> > wrote: >> >> >> >> Some hints: I'm doing collect() on a large (~10g??) dataset. If I >> >> shrink that down, I have no problems. Ive tried >> >> >> >> System.setProperty("spark.akka.frameSize", "15420") >> >> >> >> But then I get: >> >> >> >> 13/10/04 18:49:33 ERROR client.Client$ClientActor: Failed to connect to >> >> master >> >> org.jboss.netty.channel.ChannelPipelineException: Failed to initialize >> >> a pipeline. >> >> at >> >> >> >> org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:209) >> >> at >> >> >> >> org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183) >> >> at >> >> >> >> akka.remote.netty.ActiveRemoteClient$$anonfun$connect$1.apply$mcV$sp(Client.scala:173) >> >> at akka.util.Switch.liftedTree1$1(LockUtil.scala:33) >> >> at akka.util.Switch.transcend(LockUtil.scala:32) >> >> at akka.util.Switch.switchOn(LockUtil.scala:55) >> >> at akka.remote.netty.ActiveRemoteClient.connect(Client.scala:158) >> >> at >> >> >> >> akka.remote.netty.NettyRemoteTransport.send(NettyRemoteSupport.scala:153) >> >> at akka.remote.RemoteActorRef.$bang(RemoteActorRefProvider.scala:247) >> >> at >> >> >> >> org.apache.spark.deploy.client.Client$ClientActor.preStart(Client.scala:61) >> >> at akka.actor.ActorCell.create$1(ActorCell.scala:508) >> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:600) >> >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:209) >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:178) >> >> at >> >> >> >> akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516) >> >> at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259) >> >> at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) >> >> at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479) >> >> at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) >> >> Caused by: java.lang.IllegalArgumentException: maxFrameLength must be >> >> a positive integer: -1010827264 >> >> at >> >> >> >> org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:270) >> >> at >> >> >> >> org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:236) >> >> at >> >> >> >> akka.remote.netty.ActiveRemoteClientPipelineFactory.getPipeline(Client.scala:340) >> >> at >> >> >> >> org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:207) >> >> ... 18 more >> >> 13/10/04 18:49:33 ERROR cluster.SparkDeploySchedulerBackend: >> >> Disconnected from Spark cluster! >> >> 13/10/04 18:49:33 ERROR cluster.ClusterScheduler: Exiting due to error >> >> from cluster scheduler: Disconnected from Spark cluster >> >> >> >> On Fri, Oct 4, 2013 at 5:31 PM, Ryan Compton <[email protected]> >> >> wrote: >> >> > When I turn on Kryo serialization in 0.8 my jobs fail with these >> >> > errors and don't understand what's going wrong. Any ideas? >> >> > >> >> > I've got these properties: >> >> > >> >> > //my usual spark props >> >> > System.setProperty("spark.serializer", >> >> > "org.apache.spark.serializer.KryoSerializer") >> >> > System.setProperty("spark.kryo.registrator", >> >> > classOf[OSIKryoRegistrator].getName) >> >> > System.setProperty("spark.cores.max", "532") >> >> > System.setProperty("spark.executor.memory", "92g") >> >> > System.setProperty("spark.default.parallelism", "256") >> >> > System.setProperty("spark.akka.frameSize", "1024") >> >> > System.setProperty("spark.kryoserializer.buffer.mb","24") >> >> > >> >> > And these errors: >> >> > >> >> > 13/10/04 17:21:56 INFO cluster.ClusterTaskSetManager: Loss was due to >> >> > com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: >> >> > 6, required: 8 >> >> > Serialization trace: >> >> > longitude (com.hrl.issl.osi.geometry.Location) [duplicate 2] >> >> > 13/10/04 17:21:56 INFO cluster.ClusterTaskSetManager: Starting task >> >> > 2.0:728 as TID 1490 on executor 17: node25 (PROCESS_LOCAL) >> >> > 13/10/04 17:21:56 INFO cluster.ClusterTaskSetManager: Serialized task >> >> > 2.0:728 as 1892 bytes in 0 ms >> >> > 13/10/04 17:21:58 INFO cluster.ClusterTaskSetManager: Lost TID 1486 >> >> > (task 2.0:730) >> >> > 13/10/04 17:21:58 INFO cluster.ClusterTaskSetManager: Loss was due to >> >> > com.esotericsoftware.kryo.KryoException >> >> > com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: >> >> > 6, required: 8 >> >> > Serialization trace: >> >> > latitude (com.hrl.issl.osi.geometry.Location) >> >> > at >> >> > com.esotericsoftware.kryo.io.Output.require(Output.java:138) >> >> > at >> >> > com.esotericsoftware.kryo.io.Output.writeLong(Output.java:477) >> >> > at >> >> > com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:596) >> >> > at >> >> > >> >> > com.esotericsoftware.kryo.serializers.DefaultSerializers$DoubleSerializer.write(DefaultSerializers.java:137) >> >> > at >> >> > >> >> > com.esotericsoftware.kryo.serializers.DefaultSerializers$DoubleSerializer.write(DefaultSerializers.java:131) >> >> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> > at >> >> > >> >> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:576) >> >> > at >> >> > >> >> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> >> > at >> >> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> > at >> >> > com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38) >> >> > at >> >> > com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34) >> >> > at >> >> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> > at >> >> > >> >> > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >> >> > at >> >> > >> >> > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >> >> > at >> >> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> > at >> >> > >> >> > org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:126) >> >> > at >> >> > >> >> > org.apache.spark.scheduler.TaskResult.writeExternal(TaskResult.scala:40) >> >> > at >> >> > >> >> > java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1429) >> >> > at >> >> > >> >> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1398) >> >> > at >> >> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) >> >> > at >> >> > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) >> >> > at >> >> > >> >> > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27) >> >> > at >> >> > >> >> > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:47) >> >> > at >> >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:171) >> >> > at >> >> > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >> >> > at >> >> > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >> >> > at java.lang.Thread.run(Thread.java:662) >> >> > 13/10/04 17:21:58 ERROR cluster.ClusterTaskSetManager: Task 2.0:730 >> >> > failed more than 4 times; aborting job >> >> > 13/10/04 17:21:58 INFO cluster.ClusterScheduler: Remove TaskSet 2.0 >> >> > from >> >> > pool >> > >> >
