Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22374#discussion_r217242713
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
---
@@ -240,23 +240,25 @@ object TextInputCSVDataSource extends CSVDataSource {
sparkSession: SparkSession,
csv: Dataset[String],
maybeFirstLine: Option[String],
- parsedOptions: CSVOptions): StructType = maybeFirstLine match {
- case Some(firstLine) =>
- val firstRow = new
CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
- val caseSensitive =
sparkSession.sessionState.conf.caseSensitiveAnalysis
- val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
- val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
- val tokenRDD = sampled.rdd.mapPartitions { iter =>
- val filteredLines = CSVUtils.filterCommentAndEmpty(iter,
parsedOptions)
- val linesWithoutHeader =
- CSVUtils.filterHeaderLine(filteredLines, firstLine,
parsedOptions)
- val parser = new CsvParser(parsedOptions.asParserSettings)
- linesWithoutHeader.map(parser.parseLine)
- }
- CSVInferSchema.infer(tokenRDD, header, parsedOptions)
- case None =>
- // If the first line could not be read, just return the empty schema.
- StructType(Nil)
+ parsedOptions: CSVOptions): StructType = {
+ val csvParser = new CsvParser(parsedOptions.asParserSettings)
+ maybeFirstLine.map(csvParser.parseLine(_)) match {
+ case Some(firstRow) if firstRow != null =>
+ val caseSensitive =
sparkSession.sessionState.conf.caseSensitiveAnalysis
+ val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+ val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
+ val tokenRDD = sampled.rdd.mapPartitions { iter =>
+ val filteredLines = CSVUtils.filterCommentAndEmpty(iter,
parsedOptions)
+ val linesWithoutHeader =
+ CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get,
parsedOptions)
+ val parser = new CsvParser(parsedOptions.asParserSettings)
+ linesWithoutHeader.map(parser.parseLine)
+ }
+ CSVInferSchema.infer(tokenRDD, header, parsedOptions)
--- End diff --
@MaxGekk, BTW what happen if the second line is the malfromed record and it
returns `null`? From a cursory look, schema inference looks going to throw an
NPE exception.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]