Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20894#discussion_r188522937
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
    @@ -118,16 +122,62 @@ object CSVDataSource {
           TextInputCSVDataSource
         }
       }
    +
    +  def checkHeaderColumnNames(schema: StructType, columnNames: 
Array[String], fileName: String,
    +      checkHeaderFlag: Boolean, caseSensitive: Boolean): Unit = {
    +    if (checkHeaderFlag && columnNames != null) {
    +      val fieldNames = schema.map(_.name).toIndexedSeq
    +      val (headerLen, schemaSize) = (columnNames.size, fieldNames.length)
    +      var error: Option[String] = None
    +
    +      if (headerLen == schemaSize) {
    +        var i = 0
    +        while (error.isEmpty && i < headerLen) {
    +          var (nameInSchema, nameInHeader) = (fieldNames(i), 
columnNames(i))
    +          if (caseSensitive == false) {
    +            nameInSchema = nameInSchema.toLowerCase
    +            nameInHeader = nameInHeader.toLowerCase
    +          }
    +          if (nameInHeader != nameInSchema) {
    +            error = Some(
    +              s"""|CSV file header does not contain the expected fields.
    +                  | Header: ${columnNames.mkString(", ")}
    +                  | Schema: ${fieldNames.mkString(", ")}
    +                  |Expected: ${columnNames(i)} but found: ${fieldNames(i)}
    +                  |CSV file: $fileName""".stripMargin
    +            )
    +          }
    +          i += 1
    +        }
    +      } else {
    +        error = Some(
    +          s"""|Number of column in CSV header is not equal to number of 
fields in the schema:
    +              | Header length: $headerLen, schema size: $schemaSize
    +              |CSV file: $fileName""".stripMargin
    +        )
    +      }
    +
    +      error.headOption.foreach { msg =>
    +        throw new IllegalArgumentException(msg)
    +      }
    +    }
    +  }
    +
    +  def checkHeader(header: String, parser: CsvParser, schema: StructType, 
fileName: String,
    +    checkHeaderFlag: Boolean, caseSensitive: Boolean): Unit = {
    +    if (checkHeaderFlag) {
    +      checkHeaderColumnNames(schema, parser.parseLine(header), fileName, 
checkHeaderFlag,
    +        caseSensitive)
    +    }
    +  }
     }
     
     object TextInputCSVDataSource extends CSVDataSource {
       override val isSplitable: Boolean = true
     
    -  override def readFile(
    -      conf: Configuration,
    -      file: PartitionedFile,
    -      parser: UnivocityParser,
    -      schema: StructType): Iterator[InternalRow] = {
    +  override def readFile(conf: Configuration, file: PartitionedFile, 
parser: UnivocityParser,
    --- End diff --
    
    Follow code style here:
    https://github.com/databricks/scala-style-guide#linelength


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to