Thanks, it worked. On Thu, Mar 7, 2019 at 5:05 AM Akshay Bhardwaj < akshay.bhardwaj1...@gmail.com> wrote:
> Hi, > > In your spark-submit command, try using the below config property and see > if this solves the problem. > > --conf spark.sql.files.ignoreCorruptFiles=true > > For me this worked to ignore reading empty/partially uploaded gzip files > in s3 bucket. > > Akshay Bhardwaj > +91-97111-33849 > > > On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang <jiangok2...@gmail.com> wrote: > >> Hi, >> >> I have a structured streaming job which listens to a hdfs folder >> containing jsonl.gz files. The job crashed due to error: >> >> java.io.IOException: incorrect header check >> at >> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native >> Method) >> at >> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225) >> at >> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111) >> at >> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) >> at java.io.InputStream.read(InputStream.java:101) >> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182) >> at >> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218) >> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176) >> at >> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152) >> at >> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192) >> at >> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) >> at >> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50) >> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at >> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) >> at >> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186) >> at >> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) >> at >> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown >> Source) >> at >> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at >> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) >> at org.apache.spark.scheduler.Task.run(Task.scala:109) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> >> Is there a way to skip the gz files that cannot be decompressed? >> Exception handling seems not help. The only workaround I can think of is to >> decompress the gz files into another folder first and make the spark >> streaming job listen to this new folder. But this workaround may not be >> better compared with the solution using a unstructured streaming job to >> directly decompress the gz file, read jsonl file, validate the records and >> write the validated records into parquet. >> >> Any idea is highly appreciated! >> >> >> >> >>