Is that its not supported with Avro. Unlikely. On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:
> My imports: > > import org.apache.avro.generic.GenericData > > import org.apache.avro.generic.GenericRecord > > import org.apache.avro.mapred.AvroKey > > import org.apache.avro.Schema > > import org.apache.hadoop.io.NullWritable > > import org.apache.avro.mapreduce.AvroKeyInputFormat > > import org.apache.hadoop.conf.Configuration > > import org.apache.hadoop.fs.FileSystem > > import org.apache.hadoop.fs.Path > > import org.apache.hadoop.io.Text > > > def readGenericRecords(sc: SparkContext, inputDir: String, startDate: > Date, endDate: Date) = { > > val path = getInputPaths(inputDir, startDate, endDate) > > val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) > > hadoopConfiguration.set( > "mapreduce.input.fileinputformat.split.maxsize", "67108864") > > sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, > AvroKeyInputFormat[GenericRecord]](path + "/*.avro") > > } > > I need to read Avro datasets and am using strings instead of constant from > InputFormat class. > > > When i click on any hadoop dependency from eclipse, i see they point to > hadoop 2.2.x jars. > > > > On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito < > silvio.fior...@granturing.com> wrote: > >> Make sure you’re importing the right namespace for Hadoop v2.0. This >> is what I tried: >> >> import org.apache.hadoop.io.{LongWritable, Text} >> import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, >> TextInputFormat} >> >> val hadoopConf = new org.apache.hadoop.conf.Configuration() >> hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048) >> >> val input = sc.newAPIHadoopFile( >> "README.md", >> classOf[TextInputFormat], >> classOf[LongWritable], >> classOf[Text], >> hadoopConf).map(_._2.toString()) >> >> println(input.partitions.size) >> >> input. >> flatMap(_.split(" ")). >> filter(_.length > 0). >> map((_, 1)). >> reduceByKey(_ + _). >> coalesce(1). >> sortBy(_._2, false). >> take(10). >> foreach(println) >> >> >> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >> Date: Friday, June 26, 2015 at 10:18 AM >> To: Silvio Fiorito >> Cc: user >> Subject: Re: >> >> All these throw compilation error at newAPIHadoopFile >> >> 1) >> >> val hadoopConfiguration = new Configuration() >> >> hadoopConfiguration.set( >> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >> >> sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path >> + "/*.avro", classOf[AvroKey], classOf[NullWritable], >> classOf[AvroKeyInputFormat], hadoopConfiguration) >> >> 2) >> >> val hadoopConfiguration = new Configuration() >> >> hadoopConfiguration.set( >> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >> >> sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path >> + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], >> classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration) >> >> 3) >> >> val hadoopConfiguration = new Configuration() >> >> hadoopConfiguration.set( >> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >> >> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >> AvroKeyInputFormat[GenericRecord]](path + "/*.avro", >> classOf[AvroKey[GenericRecord]], classOf[NullWritable], >> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) >> >> Error: >> >> [ERROR] >> /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: >> error: overloaded method value newAPIHadoopFile with alternatives: >> >> [INFO] (path: String,fClass: >> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: >> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: >> Class[org.apache.hadoop.io.NullWritable],conf: >> org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], >> org.apache.hadoop.io.NullWritable)] <and> >> >> [INFO] (path: String)(implicit km: >> scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], >> implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], >> implicit fm: >> scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], >> org.apache.hadoop.io.NullWritable)] >> >> [INFO] cannot be applied to (String, >> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], >> Class[org.apache.hadoop.io.NullWritable], >> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], >> org.apache.hadoop.conf.Configuration) >> >> [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >> AvroKeyInputFormat[GenericRecord]](path + "/*.avro", >> classOf[AvroKey[GenericRecord]], classOf[NullWritable], >> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) >> >> >> >> On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito < >> silvio.fior...@granturing.com> wrote: >> >>> Ok, in that case I think you can set the max split size in the Hadoop >>> config object, using the FileInputFormat.SPLIT_MAXSIZE config parameter. >>> >>> Again, I haven’t done this myself, but looking through the Spark >>> codebase here: >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053 >>> >>> And the HDFS FileInputFormat implementation, that seems like a good >>> option to try. >>> >>> You should be able to call conf.setLong(FileInputFormat.SPLIT_MAXSIZE, >>> max). >>> >>> I hope that helps! >>> >>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>> Date: Thursday, June 25, 2015 at 5:49 PM >>> To: Silvio Fiorito >>> Cc: user >>> Subject: Re: >>> >>> I use >>> >>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro") >>> >>> >>> >>> https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String, >>> java.lang.Class, java.lang.Class, java.lang.Class, >>> org.apache.hadoop.conf.Configuration) >>> >>> Does not seem to have that partition option. >>> >>> On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito < >>> silvio.fior...@granturing.com> wrote: >>> >>>> Hi Deepak, >>>> >>>> Have you tried specifying the minimum partitions when you load the >>>> file? I haven’t tried that myself against HDFS before, so I’m not sure if >>>> it will affect data locality. Ideally not, it should still maintain data >>>> locality but just more partitions. Once your job runs, you can check in the >>>> Spark tasks web UI to ensure they’re all Node local. >>>> >>>> val details = sc.textFile(“hdfs://….”, 500) >>>> >>>> If you’re using something other than text file you can also specify >>>> minimum partitions when using sc.hadoopFile. >>>> >>>> Thanks, >>>> Silvio >>>> >>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>> Date: Thursday, June 25, 2015 at 3:10 PM >>>> To: Akhil Das >>>> Cc: user >>>> Subject: Re: >>>> >>>> How can i increase the number of tasks from 174 to 500 without >>>> running repartition. >>>> >>>> The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to >>>> 64 MB so as to increase the number of tasks. Similar to split size that >>>> increases the number of mappers in Hadoop M/R. >>>> >>>> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <ak...@sigmoidanalytics.com >>>> > wrote: >>>> >>>>> Look in the tuning section >>>>> <https://spark.apache.org/docs/latest/tuning.html>, also you need to >>>>> figure out whats taking time and where's your bottleneck etc. If >>>>> everything >>>>> is tuned properly, then you will need to throw more cores :) >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to >>>>>> make it run faster ? >>>>>> >>>>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> Cool. :) >>>>>>> On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote: >>>>>>> >>>>>>>> Its running now. >>>>>>>> >>>>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Now running with >>>>>>>>> >>>>>>>>> *--num-executors 9973 --driver-memory 14g --driver-java-options >>>>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g >>>>>>>>> --executor-cores 1* >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> There are multiple of these >>>>>>>>>> >>>>>>>>>> 1) >>>>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task >>>>>>>>>> 443.0 in stage 3.0 (TID 1767) >>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>> at >>>>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown >>>>>>>>>> Source) >>>>>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>>>>>>> at >>>>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>> at >>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>>>>>>>>> at >>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on >>>>>>>>>> LARS? timer thread >>>>>>>>>> >>>>>>>>>> 2) >>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? >>>>>>>>>> timer thread >>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>> at >>>>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) >>>>>>>>>> at >>>>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) >>>>>>>>>> at >>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) >>>>>>>>>> at >>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>> 3) >>>>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>> # -XX:OnOutOfMemoryError="kill %p" >>>>>>>>>> # Executing /bin/sh -c "kill 20674"... >>>>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5] >>>>>>>>>> [akka.tcp:// >>>>>>>>>> sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/] >>>>>>>>>> swallowing exception during message send >>>>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I see this >>>>>>>>>>> >>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>>>>>>> at java.lang.String.<init>(String.java:203) >>>>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405) >>>>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108) >>>>>>>>>>> at java.io.File.<init>(File.java:367) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>> at >>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>> at >>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das < >>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Can you look a bit more in the error logs? It could be >>>>>>>>>>>> getting killed because of OOM etc. One thing you can try is to set >>>>>>>>>>>> the >>>>>>>>>>>> spark.shuffle.blockTransferService to nio from netty. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I have a Spark job that has 7 stages. The first 3 stage >>>>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This stage >>>>>>>>>>>>> has >>>>>>>>>>>>> multiple task failures all the below exception. >>>>>>>>>>>>> >>>>>>>>>>>>> Multiple tasks (100s) of them get the same exception with >>>>>>>>>>>>> different hosts. How can all the host suddenly stop responding >>>>>>>>>>>>> when few >>>>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three >>>>>>>>>>>>> stages will >>>>>>>>>>>>> again run successfully. I cannot think of it being a cluster >>>>>>>>>>>>> issue. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Any suggestions ? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Spark Version : 1.3.1 >>>>>>>>>>>>> >>>>>>>>>>>>> Exception: >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to connect >>>>>>>>>>>>> to HOST >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>>>>> at org.apache.sp >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Deepak >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Deepak >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Deepak >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Deepak >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Deepak >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak