Not quite sure if error is resolved. Upon further probing, the setting spark.memory.offHeap.enabled is not getting applied in this build. When I print its value from core/src/main/scala/org/apache/spark/memory/MemoryManager.scala, it returns false even though the webUI is indicating that it's been set.
/** * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using * sun.misc.Unsafe. */ final val tungstenMemoryMode: MemoryMode = { println("-- spark.memory.offHeap.enabled: " + conf.getBoolean("spark.memory.offHeap.enabled", false)); println("-- spark.memory.offHeap.size: " + conf.getSizeAsBytes("spark.memory.offHeap.size")); if (conf.getBoolean("spark.memory.offHeap.enabled", false)) { require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0, "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true") MemoryMode.OFF_HEAP } else { MemoryMode.ON_HEAP } } This code change results in: -- spark.memory.offHeap.enabled: false -- spark.memory.offHeap.size: 1073741824 My webUI: spark.memory.unsafe.offHeaptrue spark.memory.offHeap.size1024M I am not aware of how the config manager in Spark works. But I believe there is an easy fix for this. Could you suggest a change? ~Mayuresh On Mon, Dec 21, 2015 at 1:46 PM, Mayuresh Kunjir <mayur...@cs.duke.edu> wrote: > Thanks Ted. That stack trace is from 1.5.1 build. > > I tried on the latest code as you suggested. Memory management seems to > have changed quite a bit and this error has been fixed as well. :) > > Thanks for the help! > Regards, > ~Mayuresh > > > On Mon, Dec 21, 2015 at 10:10 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> w.r.t. >> >> at >> org.apache.spark.sql.execution.UnsafeExternalRowSorter$RowComparator.compare(UnsafeExternalRowSorter.java:202) >> >> >> I looked at UnsafeExternalRowSorter.java in 1.6.0 which only has 192 >> lines of code. >> Can you run with latest RC of 1.6.0 and paste the stack trace ? >> Thanks >> >> On Thu, Dec 17, 2015 at 5:04 PM, Mayuresh Kunjir <mayur...@cs.duke.edu> >> wrote: >> >>> I am testing a simple Sort program written using Dataframe APIs. When I >>> enable spark.unsafe.offHeap, the output stage fails with a NPE. The >>> exception when run on spark-1.5.1 is copied below. >>> >>> >>> Job aborted due to stage failure: Task 23 in stage 3.0 failed 4 times, >>> most recent failure: Lost task 23.3 in stage 3.0 (TID 667, xeno-40): >>> java.lang.NullPointerException >>> >>> at >>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.compare(Unknown >>> Source) >>> at >>> org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering.compare(GenerateOrdering.scala:28) >>> at >>> org.apache.spark.sql.execution.UnsafeExternalRowSorter$RowComparator.compare(UnsafeExternalRowSorter.java:202) >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:58) >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:35) >>> at org.apache.spark.util.collection.TimSort.binarySort(TimSort.java:191) >>> at org.apache.spark.util.collection.TimSort.sort(TimSort.java:129) >>> at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:190) >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:202) >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:347) >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:332) >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:399) >>> at >>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92) >>> at >>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174) >>> at >>> org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$executePartition$1(sort.scala:160) >>> at >>> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169) >>> at >>> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169) >>> at >>> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> >>> My program looks as follows: >>> >>> case class Data(key: String, value: String) >>> >>> val lines = sc.textFile(args(0), 1) >>> val data = lines.map(_.split(" ")).map(t=>Data(t(0), t(1))).toDF() >>> data.registerTempTable("data") >>> val sorted = data.sort("key") >>> sorted.save(args(1)) >>> >>> I am running the program on Yarn v2.6 and have tried spark-1.5.1 as >>> well as current snapshot of spark-1.6.0. >>> >>> Thanks and regards, >>> ~Mayuresh >>> >>> >>> >> >