sunchao commented on a change in pull request #34659:
URL: https://github.com/apache/spark/pull/34659#discussion_r769129914
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
##########
@@ -303,55 +313,88 @@ public boolean nextBatch() throws IOException {
}
private void initializeInternal() throws IOException,
UnsupportedOperationException {
- // Check that the requested schema is supported.
- missingColumns = new boolean[requestedSchema.getFieldCount()];
- List<ColumnDescriptor> columns = requestedSchema.getColumns();
- List<String[]> paths = requestedSchema.getPaths();
- for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
- Type t = requestedSchema.getFields().get(i);
- if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
- throw new UnsupportedOperationException("Complex types not
supported.");
- }
+ missingColumns = new HashSet<>();
+ for (ParquetColumn column :
JavaConverters.seqAsJavaList(parquetColumn.children())) {
+ checkColumn(column);
+ }
+ }
- String[] colPath = paths.get(i);
- if (fileSchema.containsPath(colPath)) {
- ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
- if (!fd.equals(columns.get(i))) {
+ /**
+ * Check whether a column from requested schema is missing from the file
schema, or whether it
+ * conforms to the type of the file schema.
+ */
+ private void checkColumn(ParquetColumn column) throws IOException {
+ String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new
String[0]);
+ if (containsPath(fileSchema, path)) {
+ if (column.isPrimitive()) {
+ ColumnDescriptor desc = column.descriptor().get();
+ ColumnDescriptor fd = fileSchema.getColumnDescription(desc.getPath());
+ if (!fd.equals(desc)) {
throw new UnsupportedOperationException("Schema evolution not
supported.");
}
- missingColumns[i] = false;
} else {
- if (columns.get(i).getMaxDefinitionLevel() == 0) {
- // Column is missing in data but the required data is non-nullable.
This file is invalid.
- throw new IOException("Required column is missing in data file. Col:
" +
- Arrays.toString(colPath));
+ for (ParquetColumn childColumn :
JavaConverters.seqAsJavaList(column.children())) {
Review comment:
Thanks, yea I can raise another PR to use list in `ParquetColumn`.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
##########
@@ -303,55 +313,88 @@ public boolean nextBatch() throws IOException {
}
private void initializeInternal() throws IOException,
UnsupportedOperationException {
- // Check that the requested schema is supported.
- missingColumns = new boolean[requestedSchema.getFieldCount()];
- List<ColumnDescriptor> columns = requestedSchema.getColumns();
- List<String[]> paths = requestedSchema.getPaths();
- for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
- Type t = requestedSchema.getFields().get(i);
- if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
- throw new UnsupportedOperationException("Complex types not
supported.");
- }
+ missingColumns = new HashSet<>();
+ for (ParquetColumn column :
JavaConverters.seqAsJavaList(parquetColumn.children())) {
+ checkColumn(column);
+ }
+ }
- String[] colPath = paths.get(i);
- if (fileSchema.containsPath(colPath)) {
- ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
- if (!fd.equals(columns.get(i))) {
+ /**
+ * Check whether a column from requested schema is missing from the file
schema, or whether it
+ * conforms to the type of the file schema.
+ */
+ private void checkColumn(ParquetColumn column) throws IOException {
+ String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new
String[0]);
+ if (containsPath(fileSchema, path)) {
+ if (column.isPrimitive()) {
+ ColumnDescriptor desc = column.descriptor().get();
+ ColumnDescriptor fd = fileSchema.getColumnDescription(desc.getPath());
+ if (!fd.equals(desc)) {
throw new UnsupportedOperationException("Schema evolution not
supported.");
}
- missingColumns[i] = false;
} else {
- if (columns.get(i).getMaxDefinitionLevel() == 0) {
- // Column is missing in data but the required data is non-nullable.
This file is invalid.
- throw new IOException("Required column is missing in data file. Col:
" +
- Arrays.toString(colPath));
+ for (ParquetColumn childColumn :
JavaConverters.seqAsJavaList(column.children())) {
Review comment:
Thanks, yea I can raise another PR to use list in `ParquetColumn` after
this one is done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]