Any solutions to solve this exception ?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1

On Mon, Jul 13, 2015 at 11:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <> wrote:

> genericRecordsAndKeys.persist(StorageLevel.MEMORY_AND_DISK) with 17 as
> repartitioning argument is throwing this exception:
> 7/13 23:26:36 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 14
> in stage 2.0 failed 4 times, most recent failure: Lost task 14.3 in stage
> 2.0 (TID 37, java.lang.RuntimeException:
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at
> at
> at
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> at
> at
> at
> at
> at
> at
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at scala.collection.TraversableLike$
> at scala.collection.mutable.ArrayOps$
> at
> at
> On Mon, Jul 13, 2015 at 10:37 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <>
> wrote:
>> I stopped at 35 repartitions as it takes around 12-14 minutes. I cached a
>> RDD as it was used in the next two tasks.  However it slowed down the
>> process.
>> Code:
>>     val genericRecordsAndKeys = {
>>       record =>
>>         val rec = new MasterPrimeRecord(detail, record)
>>         var keyToOutput = new StringBuilder("");
>>         dimensions.foreach {
>>           dim =>
>>             keyToOutput = keyToOutput.append("_" + rec.get(dim).toString)
>>         }
>>         (keyToOutput.toString, rec)
>>     }
>>     genericRecordsAndKeys.cache
>>     val quantiles = genericRecordsAndKeys
>>       .map {
>>         case (keyToOutput, rec) =>
>>           var digest: TDigest = TDigest.createAvlTreeDigest(10)
>>           val fpPaidGMB = rec.get("fpPaidGMB").asInstanceOf[Double]
>>           digest.add(fpPaidGMB)
>>           var bbuf: ByteBuffer = ByteBuffer.allocate(digest.byteSize());
>>           digest.asBytes(bbuf);
>>           (keyToOutput.toString, bbuf.array())
>>       }.reduceByKey {
>>       case (v1, v2) =>
>>         var tree1 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v1
>> .asInstanceOf[scala.Array[Byte]]))
>>         var tree2 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v2
>> .asInstanceOf[scala.Array[Byte]]))
>>         tree1.add(tree2)
>>         tree1.compress()
>>         var bbuf: ByteBuffer = ByteBuffer.allocate(tree1.byteSize())
>>         tree1.asBytes(bbuf)
>>         bbuf.array
>>     }
>>     val outputRecords: RDD[(AvroKey[MasterPrimeRecord], NullWritable)] =
>> genericRecordsAndKeys.join(quantiles).map {
>>       case (k, v) =>
>>         val masterPrimeRec = v._1
>>         val mergedTree = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v._2))
>>         val capVal = mergedTree.quantile(0.999)
>>         if (masterPrimeRec.get("fpPaidGMB").asInstanceOf[Double] > capVal)
>> {
>>           masterPrimeRec.put("fpPaidGMB", capVal)
>>         }
>>         val wrap = new AvroKey[MasterPrimeRecord](masterPrimeRec)
>>         (wrap, NullWritable.get)
>>     }
>> On Mon, Jul 13, 2015 at 9:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <>
>> wrote:
>>> My guess worked fine now. The repartion took aproximately 1/4 the time
>>> as i reduce the number of paritions.
>>> And the rest of the process took 1/4 extra time but that is ok.
>>> On Mon, Jul 13, 2015 at 9:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <>
>>> wrote:
>>>> I reduced the number of partitions to 1/4 to   76  in order to reduce
>>>> the time to 1/4 (from 33 to 8) But the re-parition is still running beyond
>>>> 15 mins.
>>>> @Nirmal
>>>> click on details, shows the code lines and does not show why it is
>>>> slow. I know that repartition is slow and want to speed it up
>>>> @Sharma
>>>> I have seen increasing the cores speeds up reparition, but it does slow
>>>> down the rest of the stages in the job plan.
>>>> I need some logical explanation and math to know before hand ,
>>>> otherwise with Spark am always firing in dark. Spark has been a
>>>> depressingly lackluster so far (Join use case and now a simple outlier
>>>> detection using TDigest)
>>>> On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <>
>>>> wrote:
>>>>> Hi Deepak
>>>>> Not 100% sure , but please try increasing (--executor-cores ) to twice
>>>>> the number of your physical cores on your machine.
>>>>> Thanks and Regards
>>>>> Aniruddh
>>>>> On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <>
>>>>> wrote:
>>>>>> Its been 30 minutes and still the partitioner has not completed yet,
>>>>>> its ever.
>>>>>> Without repartition, i see this error
>>>>>>  FetchFailed(BlockManagerId(1,, 55028), 
>>>>>> shuffleId=1, mapId=0, reduceId=0, message=
>>>>>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length 
>>>>>> exceeds 2147483647: 3021252889 - discarded
>>>>>>  at 
>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>>>>  at 
>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>  at 
>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>  at 
>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <>
>>>>>> wrote:
>>>>>>> I have 100 MB of Avro data. and i do repartition(307) is taking
>>>>>>> forever.
>>>>>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord}
>>>>>>> )
>>>>>>> 3. val quantiles =
>>>>>>> {k1,k2,k3,k4},  TDigest(inputRecord).asBytes ).reduceByKey() [ 
>>>>>>> This
>>>>>>> was groupBy earlier ]
>>>>>>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>>>>>> Attached is full Scala code.
>>>>>>> I have 340 Yarn node cluster with 14G Ram on each node and have
>>>>>>> input data of just just 100 MB.  (Hadoop takes 2.5 hours on 1 TB 
>>>>>>> dataset)
>>>>>>> ./bin/spark-submit -v --master yarn-cluster  --jars
>>>>>>> /apache/hadoop-2.4.1-,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>>>>>  --num-executors 330 --driver-memory 14g --driver-java-options
>>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc 
>>>>>>> -XX:+PrintGCDetails
>>>>>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>>>>>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>>> startDate=2015-06-20 endDate=2015-06-21
>>>>>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession 
>>>>>>> subcommand=ppwmasterprime
>>>>>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>>>>>>> maxbuffersize=1068 maxResultSize=200G
>>>>>>> I see this in stdout of the task on that executor
>>>>>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local 
>>>>>>> reads feature cannot be used because libhadoop cannot be loaded.
>>>>>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling 
>>>>>>> in-memory map of 2.2 GB to disk (1 time so far)
>>>>>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling 
>>>>>>> in-memory map of 2.2 GB to disk (2 times so far)
>>>>>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling 
>>>>>>> in-memory map of 2.2 GB to disk (3 times so far)
>>>>>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling 
>>>>>>> in-memory map of 2.2 GB to disk (4 times so far)
>>>>>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling 
>>>>>>> in-memory map of 2.2 GB to disk (5 times so far)
>>>>>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling 
>>>>>>> in-memory map of 2.2 GB to disk (6 times so far)
>>>>>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling 
>>>>>>> in-memory map of 2.2 GB to disk (7 times so far)
>>>>>>> Also attached is the thread dump
>>>>>>> --
>>>>>>> Deepak
>>>>>> --
>>>>>> Deepak
>>>> --
>>>> Deepak
>>> --
>>> Deepak
>> --
>> Deepak
> --
> Deepak


Reply via email to