If you are running Spark with local[*] as master, there will be a single
process whose memory will be controlled by --driver-memory command line
option to spark submit. Check

http://spark.apache.org/docs/latest/configuration.html

spark.driver.memory 1g Amount of memory to use for the driver process, i.e.
where SparkContext is initialized. (e.g. 1g, 2g).
*Note:* In client mode, this config must not be set through the
SparkConf directly
in your application, because the driver JVM has already started at that
point. Instead, please set this through the --driver-memory command line
option or in your default properties file.

Thanks,
Sonal
Nube Technologies <http://www.nubetech.co>

<http://in.linkedin.com/in/sonalgoyal>



On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan <alecs...@gmail.com> wrote:

> Hi Joel,
>
> Here are the relevant snippets of my code and an OOM error thrown
> in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
> even though I am running with -Xmx10G and 4G executor and driver memory as
> shown below.
>
>         SparkConf sparkConf = new SparkConf()
>                 .setAppName("My Service")
>                 .setMaster("local[*]")
>                 .set("spark.ui.enabled", "true")
>                 .set("spark.executor.memory", "4G")
>                 .set("spark.driver.memory", "4G");
>
>         sparkSessionBuilder = SparkSession.builder().config(
> sparkConf).enableHiveSupport();
>
>         Dataset<Row> events = sparkSession.read()
>                 .format("json")
>                 .schema(inputConfig.getSchema())
>                 .load(inputFile.getPath());
>
>         DataFrameWriter<Row> frameWriter = events.selectExpr(
> JavaConversions.asScalaBuffer(outputSchema.getColumns())) //
> select "data.customer AS `customer`", ...
>                 .write()
>                 .options(outputConfig.getProperties()) // compression=zlib
>                 .format("orc")
>                 
> .partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions()))
> // partition by "customer"
>                 .save(outputUri.getPath());
>
>
> Here is the error log I get at runtime:
>
> 17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
> 17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
> java.lang.OutOfMemoryError: Java heap space
> Dumping heap to java_pid3790.hprof ...
> Heap dump file created [62653841 bytes in 2.212 secs]
> #
> # java.lang.OutOfMemoryError: Java heap space
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing "kill -9 3790"...
>
>
> And here is the thread from the thread dump that caused OOM:
>
> "Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
> at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
> at org.apache.hadoop.io.compress.BlockDecompressorStream.
> getCompressedData(BlockDecompressorStream.java:123)
> at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(
> BlockDecompressorStream.java:98)
> at org.apache.hadoop.io.compress.DecompressorStream.read(
> DecompressorStream.java:85)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>    Local Variable: byte[]#3957
>    Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>    Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
>    Local Variable: org.apache.hadoop.io.Text#5
> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
> skipUtfByteOrderMark(LineRecordReader.java:144)
> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
> nextKeyValue(LineRecordReader.java:184)
>    Local Variable: org.apache.hadoop.mapreduce.
> lib.input.LineRecordReader#1
> at org.apache.spark.sql.execution.datasources.
> RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>    Local Variable: org.apache.spark.sql.execution.datasources.
> RecordReaderIterator#1
> at org.apache.spark.sql.execution.datasources.
> HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>    Local Variable: org.apache.spark.sql.execution.datasources.
> HadoopFileLinesReader#1
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>    Local Variable: scala.collection.Iterator$$anon$12#1
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:177)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
>    Local Variable: org.apache.spark.sql.execution.datasources.
> FileScanRDD$$anon$1#1
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(
> UnsafeExternalRowSorter.java:190)
> at org.apache.spark.sql.execution.SortExec$$anonfun$1.
> apply(SortExec.scala:108)
>    Local Variable: org.apache.spark.sql.execution.
> UnsafeExternalRowSorter#1
>    Local Variable: org.apache.spark.executor.TaskMetrics#2
> at org.apache.spark.sql.execution.SortExec$$anonfun$1.
> apply(SortExec.scala:101)
>    Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:827)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:827)
>    Local Variable: scala.collection.Iterator$$anon$11#2
>    Local Variable: org.apache.spark.rdd.RDD$$
> anonfun$mapPartitionsInternal$1$$anonfun$apply$25#2
>    Local Variable: java.lang.Integer#1
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>    Local Variable: org.apache.spark.sql.execution.datasources.
> FilePartition#2
>    Local Variable: org.apache.spark.storage.StorageLevel#1
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>    Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4
>    Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4
>    Local Variable: scala.Tuple2#1572
>    Local Variable: org.apache.spark.sql.execution.datasources.
> FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2
>    Local Variable: scala.Tuple2#1571
>    Local Variable: org.apache.spark.TaskContextImpl#1
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>    Local Variable: org.apache.spark.scheduler.ResultTask#2
>    Local Variable: org.apache.spark.metrics.MetricsSystem#1
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>    Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5
>    Local Variable: org.apache.spark.memory.TaskMemoryManager#1
>    Local Variable: sun.management.ThreadImpl#1
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>    Local Variable: java.util.concurrent.ThreadPoolExecutor#6
>    Local Variable: org.apache.spark.executor.Executor$TaskRunner#1
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>    Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Thanks,
>
> Alec
>
> On Mon, Nov 13, 2017 at 8:30 PM, Joel D <games2013....@gmail.com> wrote:
>
>> Have you tried increasing driver, exec mem (gc overhead too if required)?
>>
>> your code snippet and stack trace will be helpful.
>>
>> On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <alecs...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>>> format. Effectively, my Java service starts up an embedded Spark cluster
>>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>>> keep getting OOM errors with large (~1GB) files.
>>>
>>> I've tried different ways to reduce memory usage, e.g. by partitioning
>>> data with dataSet.partitionBy("customer).save(filePath), or capping
>>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>>
>>> I am wondering if there is a way to avoid OOM besides splitting the
>>> source JSON file into multiple smaller ones and processing the small ones
>>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>>> in it's entirety before converting it to ORC (columnar)? If so, would it
>>> make sense to create a custom receiver that reads the Snappy file and use
>>> Spark streaming for ORC conversion?
>>>
>>> Thanks,
>>>
>>> Alec
>>>
>>>
>>>
>>>
>>>
>

Reply via email to