Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20894#discussion_r191827211
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
    @@ -110,14 +114,81 @@ abstract class CSVDataSource extends Serializable {
       }
     }
     
    -object CSVDataSource {
    +object CSVDataSource extends Logging {
       def apply(options: CSVOptions): CSVDataSource = {
         if (options.multiLine) {
           MultiLineCSVDataSource
         } else {
           TextInputCSVDataSource
         }
       }
    +
    +  /**
    +   * Checks that column names in a CSV header and field names in the 
schema are the same
    +   * by taking into account case sensitivity.
    +   */
    +  def checkHeaderColumnNames(
    +      schema: StructType,
    +      columnNames: Array[String],
    +      fileName: String,
    +      enforceSchema: Boolean,
    +      caseSensitive: Boolean): Unit = {
    +    if (columnNames != null) {
    +      val fieldNames = schema.map(_.name).toIndexedSeq
    +      val (headerLen, schemaSize) = (columnNames.size, fieldNames.length)
    +      var errorMessage: Option[String] = None
    +
    +      if (headerLen == schemaSize) {
    +        var i = 0
    +        while (errorMessage.isEmpty && i < headerLen) {
    +          var (nameInSchema, nameInHeader) = (fieldNames(i), 
columnNames(i))
    +          if (!caseSensitive) {
    +            nameInSchema = nameInSchema.toLowerCase
    +            nameInHeader = nameInHeader.toLowerCase
    +          }
    +          if (nameInHeader != nameInSchema) {
    +            errorMessage = Some(
    +              s"""|CSV header is not conform to the schema.
    +                  | Header: ${columnNames.mkString(", ")}
    +                  | Schema: ${fieldNames.mkString(", ")}
    +                  |Expected: ${fieldNames(i)} but found: ${columnNames(i)}
    +                  |CSV file: $fileName""".stripMargin)
    +          }
    +          i += 1
    +        }
    +      } else {
    +        errorMessage = Some(
    +          s"""|Number of column in CSV header is not equal to number of 
fields in the schema:
    +              | Header length: $headerLen, schema size: $schemaSize
    +              |CSV file: $fileName""".stripMargin)
    +      }
    +
    +      errorMessage.foreach { msg =>
    +        if (enforceSchema) {
    +          logWarning(msg)
    +        } else {
    +          throw new IllegalArgumentException(msg)
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Checks that CSV header contains the same column names as fields names 
in the given schema
    +   * by taking into account case sensitivity.
    +   */
    +  def checkHeader(
    +      header: String,
    +      parser: CsvParser,
    +      schema: StructType,
    +      fileName: String,
    +      enforceSchema: Boolean,
    +      caseSensitive: Boolean): Unit = {
    +    if (!enforceSchema) {
    --- End diff --
    
    The same here. It sounds like we need add a test case. 
    ```Scala
        val outputStream = new java.io.ByteArrayOutputStream()
        Console.withOut(outputStream) {
          xyz
        }
        assert(outputStream.toString.contains(...)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to