cashmand commented on code in PR #53120:
URL: https://github.com/apache/spark/pull/53120#discussion_r2539643856


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala:
##########
@@ -890,29 +890,25 @@ private[parquet] class ParquetRowConverter(
         // We may allow more than two children in the future, so consider this 
unsupported.
         throw QueryCompilationErrors.invalidVariantWrongNumFieldsError()
       }
-      val valueAndMetadata = Seq("value", "metadata").map { colName =>
+      val Seq(v, m) = Seq("value", "metadata").map { colName =>
         val idx = (0 until parquetType.getFieldCount())
-            .find(parquetType.getFieldName(_) == colName)
-        if (idx.isEmpty) {
-          throw QueryCompilationErrors.invalidVariantMissingFieldError(colName)
-        }
-        val child = parquetType.getType(idx.get)
+          .find(parquetType.getFieldName(_) == colName)
+          .getOrElse(throw 
QueryCompilationErrors.invalidVariantMissingFieldError(colName))
+        val child = parquetType.getType(idx)
         if (!child.isPrimitive || child.getRepetition != 
Type.Repetition.REQUIRED ||

Review Comment:
   This entire function is essentially deprecated, right? I think we only call 
it when `VARIANT_ALLOW_READING_SHREDDED` is false, which I think we should be 
setting to true by default, and eventually remove as an option. Maybe we can 
add a comment in the method description to make that clear.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1593,6 +1593,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val PARQUET_IGNORE_VARIANT_ANNOTATION =
+    buildConf("spark.sql.parquet.ignoreVariantAnnotation")
+      .doc("When true, ignore the variant logical type annotation and treat 
the Parquet " +

Review Comment:
   Should we mark this conf as `.internal()`? I think the main use case is to 
simplify debugging issues with the raw variant bytes, but let me know if 
there's a reason for this conf that I'm missing. Assuming my understanding is 
right, maybe we can also mention the intended use case in the doc comment.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala:
##########
@@ -374,8 +382,20 @@ class ParquetToSparkSchemaConverter(
     Option(field.getLogicalTypeAnnotation).fold(
       convertInternal(groupColumn, 
sparkReadType.map(_.asInstanceOf[StructType]))) {
       // Temporary workaround to read Shredded variant data

Review Comment:
   Can you remove the "Temporary workaround" comment now?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala:
##########
@@ -646,7 +646,9 @@ case object SparkShreddingUtils {
   def parquetTypeToSparkType(parquetType: ParquetType): DataType = {
     val messageType = 
ParquetTypes.buildMessage().addField(parquetType).named("foo")
     val column = new ColumnIOFactory().getColumnIO(messageType)
-    new 
ParquetToSparkSchemaConverter().convertField(column.getChild(0)).sparkType
+    // We need the underlying file type regardless of the config.

Review Comment:
   Nit: can you say "regardless of the ignoreVariantAnnotation config"? It's a 
bit unclear what this comment is referring to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to