Thanks, it worked.

On Thu, Mar 7, 2019 at 5:05 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi,
>
> In your spark-submit command, try using the below config property and see
> if this solves the problem.
>
> --conf spark.sql.files.ignoreCorruptFiles=true
>
> For me this worked to ignore reading empty/partially uploaded gzip files
> in s3 bucket.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang <jiangok2...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a structured streaming job which listens to a hdfs folder
>> containing jsonl.gz files. The job crashed due to error:
>>
>> java.io.IOException: incorrect header check
>>     at
>> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
>> Method)
>>     at
>> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
>>     at
>> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
>>     at
>> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
>>     at java.io.InputStream.read(InputStream.java:101)
>>     at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
>>     at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
>>     at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
>>     at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
>>     at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
>>     at
>> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>>     at
>> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>>     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>     at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>>     at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
>>     at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>>     at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>> Source)
>>     at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>     at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>     at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
>>     at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>>     at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:109)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>>
>> Is there a way to skip the gz files that cannot be decompressed?
>> Exception handling seems not help. The only workaround I can think of is to
>> decompress the gz files into another folder first and make the spark
>> streaming job listen to this new folder. But this workaround may not be
>> better compared with the solution using a unstructured streaming job to
>> directly decompress the gz file, read jsonl file, validate the records and
>> write the validated records into parquet.
>>
>> Any idea is highly appreciated!
>>
>>
>>
>>
>>

Reply via email to