[ https://issues.apache.org/jira/browse/SPARK-22438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-22438. ------------------------------- Resolution: Duplicate Have a look through JIRA first. This looks like https://issues.apache.org/jira/browse/SPARK-21033 > OutOfMemoryError on very small data sets > ---------------------------------------- > > Key: SPARK-22438 > URL: https://issues.apache.org/jira/browse/SPARK-22438 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.0 > Reporter: Morten Hornbech > Priority: Critical > > We have a customer that uses Spark as an engine for running SQL on a > collection of small datasets, typically no greater than a few thousand rows. > Recently we started observing out-of-memory errors on some new workloads. > Even though the datasets were only a few kilobytes, the job would almost > immediately spike to > 10GB of memory usage, producing an out-of-memory error > on the modest hardware (2 CPUs, 16 RAM) that is used. Using larger hardware > and allocating more memory to Spark (4 CPUs, 32 RAM) made the job complete, > but still with an unreasonable high memory usage. > The query involved was a left join on two datasets. In some, but not all, > cases we were able to remove or reduce the problem by rewriting the query to > use an exists sub-select instead. After a lot of debugging we were able to > reproduce the problem locally with the following test: > {code:java} > case class Data(value: String) > val session = SparkSession.builder.master("local[1]").getOrCreate() > import session.implicits._ > val foo = session.createDataset((1 to 500).map(i => Data(i.toString))) > val bar = session.createDataset((1 to 1).map(i => Data(i.toString))) > foo.persist(StorageLevel.MEMORY_ONLY) > foo.createTempView("foo") > bar.createTempView("bar") > val result = session.sql("select * from bar left join foo on bar.value = > foo.value") > result.coalesce(2).collect() > {code} > Running this produces the error below: > {code:java} > java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at > org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:649) > at > org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198) > at > org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136) > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100) > at > org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > The exact failure point varies with the number of threads given to spark, the > "coalesce" value and the number of rows in "foo". Using an inner join, > removing the call to persist, removing the call to coalease (or using > repartition) will all independently make the error go away. > The reason persist and coalesce are used in the workload at all is because it > is part of a more general Spark-based processing engine, not limited to these > small datasets. Therefore the workaround is not a simple as it may seem, > since we cannot tailor the Spark code to this specific case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org