Accidentally sent this to the dev mailing list, meant to send it here. I have a spark java application that in the past has used the hadoopFile interface to specify a custom TextInputFormat to be used when reading files. This custom class would gracefully handle exceptions like EOF exceptions caused by corrupt gzip files in the input data. I recently switched to using the csv parser built into spark but am now faced with the problem that anytime a bad input file is encountered my whole job fails.
My code to load the data using csv is: Dataset<Row> csv = sparkSession.read() .option("delimiter", parseSettings.getDelimiter().toString()) .option("quote", parseSettings.getQuote()) .option("parserLib", "UNIVOCITY") .csv(paths); Previously I would load the data using: JavaRDD<String> lines = sc.newAPIHadoopFile(filePaths, NewInputFormat.class, LongWritable.class, Text.class, sc.hadoopConfiguration()) .values() .map(Text::toString); Looking at the CSVFileFormat.scala class it looks like in the private readText method if I would overwrite where it passes TextInputFormat to the hadoopFile method with my customer format I would be able to achieve what I want. private def readText( sparkSession: SparkSession, options: CSVOptions, location: String): RDD[String] = { if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { sparkSession.sparkContext.textFile(location) } else { val charset = options.charset sparkSession.sparkContext // This is where I would want to be able to specify my // input format instead of TextInputFormat .hadoopFile[LongWritable, Text, TextInputFormat](location) .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) } } Does anyone know if there is another way to prevent the corrupt files from failing my job or could help to make the required changes to make the TextInputFormat customizable as I have only just started looking at scala. Thanks, Nathan