When you are doing a sc.textFile() you can actually pass the second parameter which is the number of partitions.
Thanks Best Regards On Fri, Jun 26, 2015 at 12:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > How can i increase the number of tasks from 174 to 500 without running > repartition. > > The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to 64 > MB so as to increase the number of tasks. Similar to split size that > increases the number of mappers in Hadoop M/R. > > On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Look in the tuning section >> <https://spark.apache.org/docs/latest/tuning.html>, also you need to >> figure out whats taking time and where's your bottleneck etc. If everything >> is tuned properly, then you will need to throw more cores :) >> >> Thanks >> Best Regards >> >> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to make >>> it run faster ? >>> >>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> Cool. :) >>>> On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote: >>>> >>>>> Its running now. >>>>> >>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Now running with >>>>>> >>>>>> *--num-executors 9973 --driver-memory 14g --driver-java-options >>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g >>>>>> --executor-cores 1* >>>>>> >>>>>> >>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> There are multiple of these >>>>>>> >>>>>>> 1) >>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0 >>>>>>> in stage 3.0 (TID 1767) >>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>> at >>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown >>>>>>> Source) >>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>>>> at >>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) >>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) >>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>>>>>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>> at >>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) >>>>>>> at >>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>>>>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>> at >>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>> at >>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>> at >>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? >>>>>>> timer thread >>>>>>> >>>>>>> 2) >>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? >>>>>>> timer thread >>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>> at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) >>>>>>> at >>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) >>>>>>> at >>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) >>>>>>> at >>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> 3) >>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>> # -XX:OnOutOfMemoryError="kill %p" >>>>>>> # Executing /bin/sh -c "kill 20674"... >>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5] >>>>>>> [akka.tcp://sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/] >>>>>>> swallowing exception during message send >>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace) >>>>>>> >>>>>>> >>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> I see this >>>>>>>> >>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>>>> at java.lang.String.<init>(String.java:203) >>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405) >>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108) >>>>>>>> at java.io.File.<init>(File.java:367) >>>>>>>> at >>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81) >>>>>>>> at >>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84) >>>>>>>> at >>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60) >>>>>>>> at >>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107) >>>>>>>> at >>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304) >>>>>>>> at >>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>> at >>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>> at >>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>> at >>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>> at >>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>>>> at >>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>>>>>> at >>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>>>>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>>>>>>> at >>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >>>>>>>> at >>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >>>>>>>> at >>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >>>>>>>> at >>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >>>>>>>> at >>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>> at >>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>> at >>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>> at >>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>> at >>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>> at >>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>> at >>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>> at >>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>> at >>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>> >>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das < >>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>> >>>>>>>>> Can you look a bit more in the error logs? It could be getting >>>>>>>>> killed because of OOM etc. One thing you can try is to set the >>>>>>>>> spark.shuffle.blockTransferService to nio from netty. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I have a Spark job that has 7 stages. The first 3 stage complete >>>>>>>>>> and the fourth stage beings (joins two RDDs). This stage has >>>>>>>>>> multiple task >>>>>>>>>> failures all the below exception. >>>>>>>>>> >>>>>>>>>> Multiple tasks (100s) of them get the same exception with >>>>>>>>>> different hosts. How can all the host suddenly stop responding when >>>>>>>>>> few >>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three stages >>>>>>>>>> will >>>>>>>>>> again run successfully. I cannot think of it being a cluster issue. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Any suggestions ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Spark Version : 1.3.1 >>>>>>>>>> >>>>>>>>>> Exception: >>>>>>>>>> >>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to connect to >>>>>>>>>> HOST >>>>>>>>>> at >>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>> at org.apache.sp >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Deepak >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Deepak >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Deepak >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>> >>> >>> -- >>> Deepak >>> >>> >> > > > -- > Deepak > >