Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22676#discussion_r223749938
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
    @@ -273,44 +273,47 @@ private[csv] object UnivocityParser {
           inputStream: InputStream,
           shouldDropHeader: Boolean,
           tokenizer: CsvParser): Iterator[Array[String]] = {
    -    convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => 
tokens)
    +    val handleHeader: () => Unit =
    +      () => if (shouldDropHeader) tokenizer.parseNext
    +
    +    convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
       }
     
       /**
        * Parses a stream that contains CSV strings and turns it into an 
iterator of rows.
        */
       def parseStream(
           inputStream: InputStream,
    -      shouldDropHeader: Boolean,
           parser: UnivocityParser,
    -      schema: StructType,
    -      checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
    +      headerChecker: CSVHeaderChecker,
    +      schema: StructType): Iterator[InternalRow] = {
         val tokenizer = parser.tokenizer
         val safeParser = new FailureSafeParser[Array[String]](
           input => Seq(parser.convert(input)),
           parser.options.parseMode,
           schema,
           parser.options.columnNameOfCorruptRecord,
           parser.options.multiLine)
    -    convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { 
tokens =>
    +
    +    val handleHeader: () => Unit =
    +      () => headerChecker.checkHeaderColumnNames(tokenizer)
    +
    +    convertStream(inputStream, tokenizer, handleHeader) { tokens =>
           safeParser.parse(tokens)
         }.flatten
       }
     
       private def convertStream[T](
           inputStream: InputStream,
    -      shouldDropHeader: Boolean,
           tokenizer: CsvParser,
    -      checkHeader: Array[String] => Unit = _ => ())(
    +      handleHeader: () => Unit)(
           convert: Array[String] => T) = new Iterator[T] {
         tokenizer.beginParsing(inputStream)
    -    private var nextRecord = {
    -      if (shouldDropHeader) {
    -        val firstRecord = tokenizer.parseNext()
    -        checkHeader(firstRecord)
    -      }
    -      tokenizer.parseNext()
    -    }
    +
    +    // We can handle header here since here the stream is open.
    +    handleHeader()
    --- End diff --
    
    It is but I guess it was already doing in this way.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to