cashmand commented on code in PR #53120:
URL: https://github.com/apache/spark/pull/53120#discussion_r2539643856
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala:
##########
@@ -890,29 +890,25 @@ private[parquet] class ParquetRowConverter(
// We may allow more than two children in the future, so consider this
unsupported.
throw QueryCompilationErrors.invalidVariantWrongNumFieldsError()
}
- val valueAndMetadata = Seq("value", "metadata").map { colName =>
+ val Seq(v, m) = Seq("value", "metadata").map { colName =>
val idx = (0 until parquetType.getFieldCount())
- .find(parquetType.getFieldName(_) == colName)
- if (idx.isEmpty) {
- throw QueryCompilationErrors.invalidVariantMissingFieldError(colName)
- }
- val child = parquetType.getType(idx.get)
+ .find(parquetType.getFieldName(_) == colName)
+ .getOrElse(throw
QueryCompilationErrors.invalidVariantMissingFieldError(colName))
+ val child = parquetType.getType(idx)
if (!child.isPrimitive || child.getRepetition !=
Type.Repetition.REQUIRED ||
Review Comment:
This entire function is essentially deprecated, right? I think we only call
it when `VARIANT_ALLOW_READING_SHREDDED` is false, which I think we should be
setting to true by default, and eventually remove as an option. Maybe we can
add a comment in the method description to make that clear.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1593,6 +1593,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val PARQUET_IGNORE_VARIANT_ANNOTATION =
+ buildConf("spark.sql.parquet.ignoreVariantAnnotation")
+ .doc("When true, ignore the variant logical type annotation and treat
the Parquet " +
Review Comment:
Should we mark this conf as `.internal()`? I think the main use case is to
simplify debugging issues with the raw variant bytes, but let me know if
there's a reason for this conf that I'm missing. Assuming my understanding is
right, maybe we can also mention the intended use case in the doc comment.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala:
##########
@@ -374,8 +382,20 @@ class ParquetToSparkSchemaConverter(
Option(field.getLogicalTypeAnnotation).fold(
convertInternal(groupColumn,
sparkReadType.map(_.asInstanceOf[StructType]))) {
// Temporary workaround to read Shredded variant data
Review Comment:
Can you remove the "Temporary workaround" comment now?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala:
##########
@@ -646,7 +646,9 @@ case object SparkShreddingUtils {
def parquetTypeToSparkType(parquetType: ParquetType): DataType = {
val messageType =
ParquetTypes.buildMessage().addField(parquetType).named("foo")
val column = new ColumnIOFactory().getColumnIO(messageType)
- new
ParquetToSparkSchemaConverter().convertField(column.getChild(0)).sparkType
+ // We need the underlying file type regardless of the config.
Review Comment:
Nit: can you say "regardless of the ignoreVariantAnnotation config"? It's a
bit unclear what this comment is referring to.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]