Re: How to skip corrupted avro files
Thanks for the info ! Shing On Tuesday, 5 May 2015, 15:11, Imran Rashid wrote: You might be interested in https://issues.apache.org/jira/browse/SPARK-6593 and the discussion around the PRs. This is probably more complicated than what you are looking for, but you could copy the code for HadoopReliableRDD in the PR into your own code and use it, without having to wait for the issue to get resolved. On Sun, May 3, 2015 at 12:57 PM, Shing Hing Man wrote: Hi, I am using Spark 1.3.1 to read a directory of about 2000 avro files. The avro files are from a third party and a few of them are corrupted. val path = "{myDirecotry of avro files}" val sparkConf = new SparkConf().setAppName("avroDemo").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val data = sqlContext.avroFile(path); data.select(.) When I run the above code, I get the following exception. org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222) ~[classes/:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) ~[spark-core_2.10-1.3.1.jar:1.3.1] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.1.jar:1.3.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314) ~[classes/:1.7.7] at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209) ~[classes/:1.7.7] ... 25 common frames omitted Is there an easy way to skip a corrupted avro file without reading the files one by one using sqlContext.avroFile(file) ? At present, my solution (hack) is to have my own version of org.apache.avro.file.DataFileStream with method hasNext returns false (to signal the end file), when java.io.IOException: Invalid sync! is thrown. Please see line 210 in https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java Thanks in advance for any assistance ! Shing
Re: How to skip corrupted avro files
You might be interested in https://issues.apache.org/jira/browse/SPARK-6593 and the discussion around the PRs. This is probably more complicated than what you are looking for, but you could copy the code for HadoopReliableRDD in the PR into your own code and use it, without having to wait for the issue to get resolved. On Sun, May 3, 2015 at 12:57 PM, Shing Hing Man wrote: > > Hi, > I am using Spark 1.3.1 to read a directory of about 2000 avro files. > The avro files are from a third party and a few of them are corrupted. > > val path = "{myDirecotry of avro files}" > > val sparkConf = new SparkConf().setAppName("avroDemo").setMaster("local") > val sc = new SparkContext(sparkConf) > > val sqlContext = new SQLContext(sc) > > val data = sqlContext.avroFile(path); > data.select(.) > > When I run the above code, I get the following exception. > org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! > at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222) > ~[classes/:1.7.7] > at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) > ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] > at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) > ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > ~[scala-library.jar:na] > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > ~[scala-library.jar:na] > at > org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) > ~[spark-sql_2.10-1.3.1.jar:1.3.1] > at > org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) > ~[spark-sql_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.scheduler.Task.run(Task.scala:64) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > [na:1.7.0_71] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > [na:1.7.0_71] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] > Caused by: java.io.IOException: Invalid sync! > at > org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314) > ~[classes/:1.7.7] > at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209) > ~[classes/:1.7.7] > ... 25 common frames omitted > >Is there an easy way to skip a corrupted avro file without reading the > files one by one using sqlContext.avroFile(file) ? > At present, my solution (hack) is to have my own version of > org.apache.avro.file.DataFileStream with method hasNext returns false ( > to signal the end file), when > java.io.IOException: Invalid sync! > is thrown. > Please see line 210 in > > https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java > > Thanks in advance for any assistance ! > Shing > >
How to skip corrupted avro files
Hi, I am using Spark 1.3.1 to read a directory of about 2000 avro files. The avro files are from a third party and a few of them are corrupted. val path = "{myDirecotry of avro files}" val sparkConf = new SparkConf().setAppName("avroDemo").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val data = sqlContext.avroFile(path); data.select(.) When I run the above code, I get the following exception. org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222) ~[classes/:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) ~[spark-core_2.10-1.3.1.jar:1.3.1] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.1.jar:1.3.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314) ~[classes/:1.7.7] at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209) ~[classes/:1.7.7] ... 25 common frames omitted Is there an easy way to skip a corrupted avro file without reading the files one by one using sqlContext.avroFile(file) ? At present, my solution (hack) is to have my own version of org.apache.avro.file.DataFileStream with method hasNext returns false (to signal the end file), when java.io.IOException: Invalid sync! is thrown. Please see line 210 in https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java Thanks in advance for any assistance ! Shing