Any solutions to solve this exception ? org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:389) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
On Mon, Jul 13, 2015 at 11:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > genericRecordsAndKeys.persist(StorageLevel.MEMORY_AND_DISK) with 17 as > repartitioning argument is throwing this exception: > > > 7/13 23:26:36 INFO yarn.ApplicationMaster: Final app status: FAILED, > exitCode: 15, (reason: User class threw exception: > org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 > in stage 2.0 failed 4 times, most recent failure: Lost task 14.3 in stage > 2.0 (TID 37, phxaishdc9dn0725.phx.ebay.com): java.lang.RuntimeException: > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) > > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) > > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) > > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) > > at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509) > > at > org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300) > > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) > > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > > at > org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) > > at > org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) > > > On Mon, Jul 13, 2015 at 10:37 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> I stopped at 35 repartitions as it takes around 12-14 minutes. I cached a >> RDD as it was used in the next two tasks. However it slowed down the >> process. >> >> Code: >> >> val genericRecordsAndKeys = inputRecords.map { >> >> record => >> >> val rec = new MasterPrimeRecord(detail, record) >> >> var keyToOutput = new StringBuilder(""); >> >> dimensions.foreach { >> >> dim => >> >> keyToOutput = keyToOutput.append("_" + rec.get(dim).toString) >> >> } >> >> (keyToOutput.toString, rec) >> >> } >> >> genericRecordsAndKeys.cache >> >> >> val quantiles = genericRecordsAndKeys >> >> .map { >> >> case (keyToOutput, rec) => >> >> var digest: TDigest = TDigest.createAvlTreeDigest(10) >> >> val fpPaidGMB = rec.get("fpPaidGMB").asInstanceOf[Double] >> >> digest.add(fpPaidGMB) >> >> var bbuf: ByteBuffer = ByteBuffer.allocate(digest.byteSize()); >> >> digest.asBytes(bbuf); >> >> (keyToOutput.toString, bbuf.array()) >> >> }.reduceByKey { >> >> case (v1, v2) => >> >> var tree1 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v1 >> .asInstanceOf[scala.Array[Byte]])) >> >> var tree2 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v2 >> .asInstanceOf[scala.Array[Byte]])) >> >> tree1.add(tree2) >> >> tree1.compress() >> >> var bbuf: ByteBuffer = ByteBuffer.allocate(tree1.byteSize()) >> >> tree1.asBytes(bbuf) >> >> bbuf.array >> >> } >> >> >> val outputRecords: RDD[(AvroKey[MasterPrimeRecord], NullWritable)] = >> genericRecordsAndKeys.join(quantiles).map { >> >> case (k, v) => >> >> val masterPrimeRec = v._1 >> >> val mergedTree = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v._2)) >> >> val capVal = mergedTree.quantile(0.999) >> >> if (masterPrimeRec.get("fpPaidGMB").asInstanceOf[Double] > capVal) >> { >> >> masterPrimeRec.put("fpPaidGMB", capVal) >> >> } >> >> val wrap = new AvroKey[MasterPrimeRecord](masterPrimeRec) >> >> (wrap, NullWritable.get) >> >> } >> >> On Mon, Jul 13, 2015 at 9:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> My guess worked fine now. The repartion took aproximately 1/4 the time >>> as i reduce the number of paritions. >>> And the rest of the process took 1/4 extra time but that is ok. >>> >>> On Mon, Jul 13, 2015 at 9:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>> wrote: >>> >>>> I reduced the number of partitions to 1/4 to 76 in order to reduce >>>> the time to 1/4 (from 33 to 8) But the re-parition is still running beyond >>>> 15 mins. >>>> >>>> @Nirmal >>>> click on details, shows the code lines and does not show why it is >>>> slow. I know that repartition is slow and want to speed it up >>>> >>>> @Sharma >>>> I have seen increasing the cores speeds up reparition, but it does slow >>>> down the rest of the stages in the job plan. >>>> >>>> >>>> I need some logical explanation and math to know before hand , >>>> otherwise with Spark am always firing in dark. Spark has been a >>>> depressingly lackluster so far (Join use case and now a simple outlier >>>> detection using TDigest) >>>> >>>> On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <asharma...@gmail.com> >>>> wrote: >>>> >>>>> Hi Deepak >>>>> >>>>> Not 100% sure , but please try increasing (--executor-cores ) to twice >>>>> the number of your physical cores on your machine. >>>>> >>>>> Thanks and Regards >>>>> Aniruddh >>>>> >>>>> On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Its been 30 minutes and still the partitioner has not completed yet, >>>>>> its ever. >>>>>> >>>>>> Without repartition, i see this error >>>>>> https://issues.apache.org/jira/browse/SPARK-5928 >>>>>> >>>>>> >>>>>> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), >>>>>> shuffleId=1, mapId=0, reduceId=0, message= >>>>>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length >>>>>> exceeds 2147483647: 3021252889 - discarded >>>>>> at >>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>>>>> at >>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>> at >>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>> at >>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I have 100 MB of Avro data. and i do repartition(307) is taking >>>>>>> forever. >>>>>>> >>>>>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} >>>>>>> ) >>>>>>> 3. val quantiles = >>>>>>> x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes ).reduceByKey() [ >>>>>>> This >>>>>>> was groupBy earlier ] >>>>>>> 4. x.join(quantiles).coalesce(100).writeInAvro >>>>>>> >>>>>>> >>>>>>> Attached is full Scala code. >>>>>>> >>>>>>> I have 340 Yarn node cluster with 14G Ram on each node and have >>>>>>> input data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB >>>>>>> dataset) >>>>>>> >>>>>>> >>>>>>> ./bin/spark-submit -v --master yarn-cluster --jars >>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar >>>>>>> --num-executors 330 --driver-memory 14g --driver-java-options >>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc >>>>>>> -XX:+PrintGCDetails >>>>>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue >>>>>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp >>>>>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar >>>>>>> startDate=2015-06-20 endDate=2015-06-21 >>>>>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession >>>>>>> subcommand=ppwmasterprime >>>>>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128 >>>>>>> maxbuffersize=1068 maxResultSize=200G >>>>>>> >>>>>>> >>>>>>> I see this in stdout of the task on that executor >>>>>>> >>>>>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local >>>>>>> reads feature cannot be used because libhadoop cannot be loaded. >>>>>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling >>>>>>> in-memory map of 2.2 GB to disk (1 time so far) >>>>>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling >>>>>>> in-memory map of 2.2 GB to disk (2 times so far) >>>>>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling >>>>>>> in-memory map of 2.2 GB to disk (3 times so far) >>>>>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling >>>>>>> in-memory map of 2.2 GB to disk (4 times so far) >>>>>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling >>>>>>> in-memory map of 2.2 GB to disk (5 times so far) >>>>>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling >>>>>>> in-memory map of 2.2 GB to disk (6 times so far) >>>>>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling >>>>>>> in-memory map of 2.2 GB to disk (7 times so far) >>>>>>> >>>>>>> >>>>>>> >>>>>>> Also attached is the thread dump >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Deepak >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak