Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22374#discussion_r217242713 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -240,23 +240,25 @@ object TextInputCSVDataSource extends CSVDataSource { sparkSession: SparkSession, csv: Dataset[String], maybeFirstLine: Option[String], - parsedOptions: CSVOptions): StructType = maybeFirstLine match { - case Some(firstLine) => - val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine) - val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) - val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions) - val tokenRDD = sampled.rdd.mapPartitions { iter => - val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions) - val linesWithoutHeader = - CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions) - val parser = new CsvParser(parsedOptions.asParserSettings) - linesWithoutHeader.map(parser.parseLine) - } - CSVInferSchema.infer(tokenRDD, header, parsedOptions) - case None => - // If the first line could not be read, just return the empty schema. - StructType(Nil) + parsedOptions: CSVOptions): StructType = { + val csvParser = new CsvParser(parsedOptions.asParserSettings) + maybeFirstLine.map(csvParser.parseLine(_)) match { + case Some(firstRow) if firstRow != null => + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) + val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions) + val tokenRDD = sampled.rdd.mapPartitions { iter => + val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions) + val linesWithoutHeader = + CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions) + val parser = new CsvParser(parsedOptions.asParserSettings) + linesWithoutHeader.map(parser.parseLine) + } + CSVInferSchema.infer(tokenRDD, header, parsedOptions) --- End diff -- @MaxGekk, BTW what happen if the second line is the malfromed record and it returns `null`? From a cursory look, schema inference looks going to throw an NPE exception.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org