xkrogen commented on a change in pull request #30869:
URL: https://github.com/apache/spark/pull/30869#discussion_r669721664



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
##########
@@ -163,6 +163,12 @@ trait FileFormat {
    * By default all data types are supported.
    */
   def supportDataType(dataType: DataType): Boolean = true
+
+  /**
+   * Check whether target schema field name is valid.
+   * If field name is invalid, will throw [[AnalysisException]].

Review comment:
       Can you use `@throws` for this documentation?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
##########
@@ -933,19 +934,36 @@ object DDLUtils {
         case HIVE_PROVIDER =>
           val serde = table.storage.serde
           if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
-            OrcFileFormat.checkFieldNames(colNames)
+            checkDataColNames("orc", colNames)
           } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
             serde == Some("parquet.hive.serde.ParquetHiveSerDe") ||
             serde == 
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) {
-            ParquetSchemaConverter.checkFieldNames(colNames)
+            checkDataColNames("parquet", colNames)
+          } else if (serde == HiveSerDe.sourceToSerDe("avro").get.serde) {
+            checkDataColNames("avro", colNames)
           }
-        case "parquet" => ParquetSchemaConverter.checkFieldNames(colNames)
-        case "orc" => OrcFileFormat.checkFieldNames(colNames)
+        case provider if Seq("parquet", "orc", "avro").contains(provider) =>
+          checkDataColNames(provider, colNames)
         case _ =>
       }
     }
   }
 
+  private[sql] def checkDataColNames(provider: String, colNames: Seq[String]): 
Unit = {
+    try {
+      DataSource.lookupDataSource(provider, 
SQLConf.get).getConstructor().newInstance() match {
+        case f: FileFormat => f.checkFieldNames(colNames)
+        case f: FileDataSourceV2 => f.checkFieldNames(colNames)
+        case _ =>
+      }
+    } catch {
+      case e: AnalysisException if e.getMessage.contains("contains invalid 
character") => throw e
+      case e: SchemaParseException => throw e
+      case e: Throwable =>
+        logError(s"Failed to find data source: $provider when check data 
column names.", e)

Review comment:
       We should catch a more specific exception here. At least limited to 
`NonFatal`, but really it should probably be just the should probably just be 
`ReflectiveOperationException` and `SecurityException`. I also don't see why we 
catch-and-rethrow `AnalysisException` and `SchemaParseException`?

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
##########
@@ -153,6 +154,31 @@ private[sql] class AvroFileFormat extends FileFormat
   }
 
   override def supportDataType(dataType: DataType): Boolean = 
AvroUtils.supportsDataType(dataType)
+
+  override def checkFieldNames(names: Seq[String]): Unit = {
+    names.foreach(checkFieldName)
+  }
+
+  private def checkFieldName(name: String): Unit = {
+    val length = name.length
+    if (length == 0) {
+      throw new SchemaParseException("Empty name")
+    } else {
+      val first = name.charAt(0)
+      if (!Character.isLetter(first) && first != '_') {
+        throw new SchemaParseException("Illegal initial character: " + name)
+      } else {
+        var i = 1
+        while (i < length) {
+          val c = name.charAt(i)
+          if (!Character.isLetterOrDigit(c) && c != '_') {
+            throw new SchemaParseException("Illegal character in: " + name)
+          }
+          i += 1
+        }
+      }

Review comment:
       Looks like this code is directly copied from `Schema.validateName`. At 
minimum, we need to provide a copyright attribution here. It's a shame that the 
function isn't exposed publicly.
   
   Rather than duplicating the validation logic here, I'm wondering if we 
should work around it by creating a placeholder schema with the fields from 
`checkFieldNames` (the types can be arbitrary) and asking Avro to parse it, and 
treating the field names as valid if Avro doesn't throw a 
`SchemaParseException`, e.g.:
   ```
           val emptyBuilder = SchemaBuilder.record("foo").fields()
           val fullBuilder = names.foldLeft(emptyBuilder)((builder, name) => 
builder.requiredInt(name))
           fullBuilder.endRecord()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to