Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Piyush Acharya
Please try with maxBytesPerTrigger option, probably files are big enough to
crash the JVM.
Please give some info on Executors and file info ( size etc)

Regards,
..Piyush

On Sun, Jul 19, 2020 at 3:29 PM Rachana Srivastava
 wrote:

> *Issue:* I am trying to process 5000+ files of gzipped json file
> periodically from S3 using Structured Streaming code.
>
> *Here are the key steps:*
>
>1.
>
>Read json schema and broadccast to executors
>2.
>
>Read Stream
>
>Dataset inputDS = sparkSession.readStream() .format("text")
>.option("inferSchema", "true") .option("header", "true")
>.option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE")
>.json(inputPath + "/*");
>3.
>
>Process each file in a map Dataset ds = inputDS.map(x -> { ... },
>Encoders.STRING());
>4.
>
>Write output to S3
>
>StreamingQuery query = ds .coalesce(1) .writeStream()
>.outputMode("append") .format("csv") ... .start();
>
> *maxFilesPerTrigger* is set to 500 so I was hoping the streaming will
> pick only that many file to process. Why are we getting OOM? If in a we
> have more than 3500 files then system crashes with OOM.
>
>


Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Sanjeev Mishra
Can you reduce maxFilesPerTrigger further and see if the OOM still persists, if 
it does then the problem may be somewhere else.

> On Jul 19, 2020, at 5:37 AM, Jungtaek Lim  
> wrote:
> 
> Please provide logs and dump file for the OOM case - otherwise no one could 
> say what's the cause.
> 
> Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError 
> -XX:HeapDumpPath="...dir..."
> 
> On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava 
>  wrote:
> Issue: I am trying to process 5000+ files of gzipped json file periodically 
> from S3 using Structured Streaming code. 
> 
> Here are the key steps:
> Read json schema and broadccast to executors
> Read Stream
> 
> Dataset inputDS = sparkSession.readStream() .format("text") 
> .option("inferSchema", "true") .option("header", "true") .option("multiLine", 
> true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + 
> "/*");
> Process each file in a map Dataset ds = inputDS.map(x -> { ... }, 
> Encoders.STRING());
> Write output to S3
> 
> StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") 
> .format("csv") ... .start();
> maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only 
> that many file to process. Why are we getting OOM? If in a we have more than 
> 3500 files then system crashes with OOM.
> 
> 



Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Jungtaek Lim
Please provide logs and dump file for the OOM case - otherwise no one could
say what's the cause.

Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath="...dir..."

On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava
 wrote:

> *Issue:* I am trying to process 5000+ files of gzipped json file
> periodically from S3 using Structured Streaming code.
>
> *Here are the key steps:*
>
>1.
>
>Read json schema and broadccast to executors
>2.
>
>Read Stream
>
>Dataset inputDS = sparkSession.readStream() .format("text")
>.option("inferSchema", "true") .option("header", "true")
>.option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE")
>.json(inputPath + "/*");
>3.
>
>Process each file in a map Dataset ds = inputDS.map(x -> { ... },
>Encoders.STRING());
>4.
>
>Write output to S3
>
>StreamingQuery query = ds .coalesce(1) .writeStream()
>.outputMode("append") .format("csv") ... .start();
>
> *maxFilesPerTrigger* is set to 500 so I was hoping the streaming will
> pick only that many file to process. Why are we getting OOM? If in a we
> have more than 3500 files then system crashes with OOM.
>
>


OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Rachana Srivastava
Issue: I am trying to process 5000+ files of gzipped json file periodically 
from S3 using Structured Streaming code. 
Here are the key steps:   
   -
Read json schema and broadccast to executors

   -
Read Stream
   
Dataset inputDS = sparkSession.readStream() .format("text") 
.option("inferSchema", "true") .option("header", "true") .option("multiLine", 
true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + "/*");

   -
Process each file in a map Dataset ds = inputDS.map(x -> { ... }, 
Encoders.STRING());

   -
Write output to S3
   
StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") 
.format("csv") ... .start();


maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only 
that many file to process. Why are we getting OOM? If in a we have more than 
3500 files then system crashes with OOM.