When you are doing a sc.textFile() you can actually pass the second
parameter which is the number of partitions.

Thanks
Best Regards

On Fri, Jun 26, 2015 at 12:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> How can i increase the number of tasks from 174 to 500 without running
> repartition.
>
> The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to 64
> MB so as to increase the number of tasks. Similar to split size that
> increases the number of mappers in Hadoop M/R.
>
> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Look in the tuning section
>> <https://spark.apache.org/docs/latest/tuning.html>, also you need to
>> figure out whats taking time and where's your bottleneck etc. If everything
>> is tuned properly, then you will need to throw more cores :)
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>> wrote:
>>
>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to make
>>> it run faster ?
>>>
>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Cool. :)
>>>> On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote:
>>>>
>>>>> Its running now.
>>>>>
>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Now running with
>>>>>>
>>>>>> *--num-executors 9973 --driver-memory 14g --driver-java-options
>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g
>>>>>> --executor-cores 1*
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> There are multiple of these
>>>>>>>
>>>>>>> 1)
>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0
>>>>>>> in stage 3.0 (TID 1767)
>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>> at
>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown
>>>>>>> Source)
>>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>>> at
>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>> at
>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
>>>>>>> at
>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>> at
>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>> at
>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>> at
>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> at
>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS?
>>>>>>> timer thread
>>>>>>>
>>>>>>> 2)
>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS?
>>>>>>> timer thread
>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>> at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22)
>>>>>>> at
>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443)
>>>>>>> at
>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
>>>>>>> at
>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>> 3)
>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>> # -XX:OnOutOfMemoryError="kill %p"
>>>>>>> #   Executing /bin/sh -c "kill 20674"...
>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5]
>>>>>>> [akka.tcp://sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/]
>>>>>>> swallowing exception during message send
>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> I see this
>>>>>>>>
>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>> at java.lang.String.<init>(String.java:203)
>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405)
>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108)
>>>>>>>> at java.io.File.<init>(File.java:367)
>>>>>>>> at
>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81)
>>>>>>>> at
>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84)
>>>>>>>> at
>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60)
>>>>>>>> at
>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107)
>>>>>>>> at
>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304)
>>>>>>>> at
>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>> at
>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>> at
>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>> at
>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>> at
>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>>>> at
>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>>>> at
>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>>>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>>>>>> at
>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>>>>>>> at
>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>>>>>>>> at
>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>>>>>>>> at
>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>>>>>>>> at
>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>>> at
>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>> at
>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>> at
>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>> at
>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>>> at
>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>> at
>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>> at
>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>>> at
>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>
>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das <
>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> Can you look a bit more in the error logs? It could be getting
>>>>>>>>> killed because of OOM etc. One thing you can try is to set the
>>>>>>>>> spark.shuffle.blockTransferService to nio from netty.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I have a Spark job that has 7 stages. The first 3 stage complete
>>>>>>>>>> and the fourth stage beings (joins two RDDs). This stage has 
>>>>>>>>>> multiple task
>>>>>>>>>>  failures all the below exception.
>>>>>>>>>>
>>>>>>>>>> Multiple tasks (100s) of them get the same exception with
>>>>>>>>>> different hosts. How can all the host suddenly stop responding when 
>>>>>>>>>> few
>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three stages 
>>>>>>>>>> will
>>>>>>>>>> again run successfully. I cannot think of it being a cluster issue.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Any suggestions ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Spark Version : 1.3.1
>>>>>>>>>>
>>>>>>>>>> Exception:
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
>>>>>>>>>> HOST
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>      at 
>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>>>      at 
>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>      at 
>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>      at 
>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>>>>>      at org.apache.sp
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Deepak
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Deepak
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>

Reply via email to