Re: OOM while processing read/write to S3 using Spark Structured Streaming
Please try with maxBytesPerTrigger option, probably files are big enough to crash the JVM. Please give some info on Executors and file info ( size etc) Regards, ..Piyush On Sun, Jul 19, 2020 at 3:29 PM Rachana Srivastava wrote: > *Issue:* I am trying to process 5000+ files of gzipped json file > periodically from S3 using Structured Streaming code. > > *Here are the key steps:* > >1. > >Read json schema and broadccast to executors >2. > >Read Stream > >Dataset inputDS = sparkSession.readStream() .format("text") >.option("inferSchema", "true") .option("header", "true") >.option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE") >.json(inputPath + "/*"); >3. > >Process each file in a map Dataset ds = inputDS.map(x -> { ... }, >Encoders.STRING()); >4. > >Write output to S3 > >StreamingQuery query = ds .coalesce(1) .writeStream() >.outputMode("append") .format("csv") ... .start(); > > *maxFilesPerTrigger* is set to 500 so I was hoping the streaming will > pick only that many file to process. Why are we getting OOM? If in a we > have more than 3500 files then system crashes with OOM. > >
Re: OOM while processing read/write to S3 using Spark Structured Streaming
Can you reduce maxFilesPerTrigger further and see if the OOM still persists, if it does then the problem may be somewhere else. > On Jul 19, 2020, at 5:37 AM, Jungtaek Lim > wrote: > > Please provide logs and dump file for the OOM case - otherwise no one could > say what's the cause. > > Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError > -XX:HeapDumpPath="...dir..." > > On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava > wrote: > Issue: I am trying to process 5000+ files of gzipped json file periodically > from S3 using Structured Streaming code. > > Here are the key steps: > Read json schema and broadccast to executors > Read Stream > > Dataset inputDS = sparkSession.readStream() .format("text") > .option("inferSchema", "true") .option("header", "true") .option("multiLine", > true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + > "/*"); > Process each file in a map Dataset ds = inputDS.map(x -> { ... }, > Encoders.STRING()); > Write output to S3 > > StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") > .format("csv") ... .start(); > maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only > that many file to process. Why are we getting OOM? If in a we have more than > 3500 files then system crashes with OOM. > >
Re: OOM while processing read/write to S3 using Spark Structured Streaming
Please provide logs and dump file for the OOM case - otherwise no one could say what's the cause. Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="...dir..." On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava wrote: > *Issue:* I am trying to process 5000+ files of gzipped json file > periodically from S3 using Structured Streaming code. > > *Here are the key steps:* > >1. > >Read json schema and broadccast to executors >2. > >Read Stream > >Dataset inputDS = sparkSession.readStream() .format("text") >.option("inferSchema", "true") .option("header", "true") >.option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE") >.json(inputPath + "/*"); >3. > >Process each file in a map Dataset ds = inputDS.map(x -> { ... }, >Encoders.STRING()); >4. > >Write output to S3 > >StreamingQuery query = ds .coalesce(1) .writeStream() >.outputMode("append") .format("csv") ... .start(); > > *maxFilesPerTrigger* is set to 500 so I was hoping the streaming will > pick only that many file to process. Why are we getting OOM? If in a we > have more than 3500 files then system crashes with OOM. > >
OOM while processing read/write to S3 using Spark Structured Streaming
Issue: I am trying to process 5000+ files of gzipped json file periodically from S3 using Structured Streaming code. Here are the key steps: - Read json schema and broadccast to executors - Read Stream Dataset inputDS = sparkSession.readStream() .format("text") .option("inferSchema", "true") .option("header", "true") .option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + "/*"); - Process each file in a map Dataset ds = inputDS.map(x -> { ... }, Encoders.STRING()); - Write output to S3 StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") .format("csv") ... .start(); maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only that many file to process. Why are we getting OOM? If in a we have more than 3500 files then system crashes with OOM.