I am trying to run a Spark job which reads from ElasticSearch and should
write it's output back to a separate ElasticSearch index. Unfortunately I
keep getting `java.lang.OutOfMemoryError: Java heap space` exceptions. I've
tried running it with: --conf spark.memory.offHeap.enabled=true --conf
spark.memory.offHeap.size=2147483648 <(214)%20748-3648> --conf
spark.executor.memory=4g. That didn't help though.

I use Spark version: 2.0.0, 55 worker nodes, ElasticSearch version: 2.3.3,
Scala version 2.11.8, Java 1.8.0_60.

scala> unique_authors.saveToEs("top_users_2016_11_29_to_2016_12_05/user")
[Stage 1:>                                                     (0 + 108) /
2048]16/12/06 03:19:40 WARN TaskSetManager: Lost task 78.0 in stage 1.0
(TID 148, 136.243.58.230): java.lang.OutOfMemoryError: Java heap space
        at org.spark_project.guava.collect.Ordering.leastOf(
Ordering.java:657)
        at org.apache.spark.util.collection.Utils$.takeOrdered(
Utils.scala:37)
        at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$
anonfun$4.apply(limit.scala:143)
        at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$
anonfun$4.apply(limit.scala:142)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
anonfun$apply$23.apply(RDD.scala:766)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
anonfun$apply$23.apply(RDD.scala:766)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(
MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(
MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Links to logs:
Spark-shell log: https://gist.github.com/lakomiec/
e53f8e3f0a7227f751978f5ad95b6c52
Content of compute-top-unique-users.scala: https://gist.github.com/lakomiec/
23e221131554fc9e726f7d6cdc5b88b5
Exception on worker node: https://gist.github.com/lakomiec/
560ab486eed981fd864086189afb413e


... one additional thing to add.

We tried:

content = content.persist(StorageLevel.MEMORY_AND_DISK)

but that didn't seem to have any impact...

-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
<https://plus.google.com/102718274791889610666/posts>

Reply via email to