Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException
was thrown:

    while (iterator.hasNext) {
      val partitionId = iterator.nextPartition()
      iterator.writeNext(partitionWriters(partitionId))
    }
Meaning, partitionId was negative.
Execute the following and examine the value of i:

    int i = -78 % 40;

You will see how your getPartition() method should be refined to prevent
this exception.

On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy <ashe...@instartlogic.com>
wrote:

> I am using spark-1.4.1
>
> Here's the skeleton code:
>
> JavaPairRDD<NewKey, ExportObject> rddPair =
>   rdd.repartitionAndSortWithinPartitions(
>   new CustomPartitioner(), new ExportObjectComparator())
>     .persist(StorageLevel.MEMORY_AND_DISK_SER());
>
> ...
>
> @SuppressWarnings("serial")
> private static class CustomPartitioner extends Partitioner {
>   int numPartitions;
>   @Override
>   public int numPartitions() {
>     numPartitions = 40;
>     return numPartitions;
>   }
>
>   @Override
>   public int getPartition(Object o) {
>     NewKey newKey = (NewKey) o;
>     return (int) newKey.getGsMinusURL() % numPartitions;
>   }
> }
>
> ...
>
> @SuppressWarnings("serial")
> private static class ExportObjectComparator
>   implements Serializable, Comparator<NewKey> {
>   @Override
>   public int compare(NewKey o1, NewKey o2) {
>     if (o1.hits == o2.hits) {
>       return 0;
>     } else if (o1.hits > o2.hits) {
>       return -1;
>     } else {
>       return 1;
>     }
>   }
>
> }
>
> ...
>
>
>
> Thanks,
> Ashish
>
> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Which release of Spark are you using ?
>>
>> Can you show skeleton of your partitioner and comparator ?
>>
>> Thanks
>>
>>
>>
>> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy <ashe...@instartlogic.com>
>> wrote:
>>
>> Hi,
>>
>> I am trying to sort a RDD pair using repartitionAndSortWithinPartitions()
>> for my key [which is a custom class, not a java primitive] using a custom
>> partitioner on that key and a custom comparator. However, it fails
>> consistently:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in
>> stage 1.0 (TID 202, 172.16.18.25):
>> java.lang.ArrayIndexOutOfBoundsException: -78
>>         at
>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
>>         at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
>>         at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>>         at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>         at scala.Option.foreach(Option.scala:236)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>>         at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
>>         at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
>>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> I also persist the RDD using the "memory and disk" storage level. The
>> stack trace above comes from spark's code and not my application code. Can
>> you pls point out what I am doing wrong ?
>>
>> Thanks,
>> Ashish
>>
>>
>

Reply via email to