sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r655599949
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -170,77 +170,135 @@ public int readInteger() {
* }
*/
public void readBatch(
Review comment:
Sorry @cloud-fan, I should've add more context in the PR description.
Let me try to add here and copy later to there.
1. The column index filtering is largely implemented in parquet-mr (via
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered
Parquet pages are returned to Spark through the
`ParquetFileReader.readNextFilteredRowGroup` and
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the
related changes in the vectorized reader path.
2. Spark needs more work to handle mis-aligned Parquet pages returned from
parquet-mr side, when there are multiple columns and their type width are
different (e.g., int and bigint). For this issue, @lxian already gave a pretty
good description in
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support
the case, Spark needs to leverage the API
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
which returns the indexes of all rows (note the difference between rows and
values: for flat schema there is no difference between the two, but for complex
schema they're different) after filtering within a Parquet row group. In
addition, because there are gaps between pages, we'll need to know what is the
index for the first row in a page, so we can keep consuming values from a page
and skip them when they are not in the row indexes. This is provided by the
`DataPage.getFirs
tRowIndex` method.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/**
+ * Helper class to store intermediate state while reading a Parquet column
chunk.
+ */
+final class ParquetReadState {
Review comment:
Yes this occurred to me the other day as well :) I think it's a good
idea. Let me move this refactoring part into a separate PR. Thanks.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -170,77 +170,135 @@ public int readInteger() {
* }
*/
public void readBatch(
Review comment:
Sorry @cloud-fan, I should've add more context in the PR description.
Let me try to add here and copy later to there.
1. The column index filtering is largely implemented in parquet-mr (via
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered
Parquet pages are returned to Spark through the
`ParquetFileReader.readNextFilteredRowGroup` and
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the
related changes in the vectorized reader path.
2. Spark needs more work to handle mis-aligned Parquet pages returned from
parquet-mr side, when there are multiple columns and their type width are
different (e.g., int and bigint). For this issue, @lxian already gave a pretty
good description in
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support
the case, Spark needs to leverage the API
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
which returns the indexes of all rows (note the difference between rows and
values: for flat schema there is no difference between the two, but for complex
schema they're different) after filtering within a Parquet row group. In
addition, because there are gaps between pages, we'll need to know what is the
index for the first row in a page, so we can comparing indexes of values (rows)
from a page with the row indexes. This is provided by the
`DataPage.getFirstRowIndex` method
.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -170,77 +170,135 @@ public int readInteger() {
* }
*/
public void readBatch(
Review comment:
Sorry @cloud-fan, I should've add more context in the PR description.
Let me try to add here and copy later to there.
1. The column index filtering is largely implemented in parquet-mr (via
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered
Parquet pages are returned to Spark through the
`ParquetFileReader.readNextFilteredRowGroup` and
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the
related changes in the vectorized reader path.
2. Spark needs more work to handle mis-aligned Parquet pages returned from
parquet-mr side, when there are multiple columns and their type width are
different (e.g., int and bigint). For this issue, @lxian already gave a pretty
good description in
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support
the case, Spark needs to leverage the API
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
which returns the indexes of all rows (note the difference between rows and
values: for flat schema there is no difference between the two, but for complex
schema they're different) after filtering within a Parquet row group. In
addition, because there are gaps between pages, we'll need to know what is the
index for the first row in a page, so we can compare indexes of values (rows)
from a page with the row indexes mentioned above. This is provided by the
`DataPage.getFirstRo
wIndex` method.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -170,77 +170,135 @@ public int readInteger() {
* }
*/
public void readBatch(
Review comment:
Sorry @cloud-fan, I should've add more context in the PR description.
Let me try to add here and copy later to there.
1. The column index filtering is largely implemented in parquet-mr (via
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered
Parquet pages are returned to Spark through the
`ParquetFileReader.readNextFilteredRowGroup` and
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the
related changes in the vectorized reader path.
2. Spark needs more work to handle mis-aligned Parquet pages returned from
parquet-mr side, when there are multiple columns and their type width are
different (e.g., int and bigint). For this issue, @lxian already gave a pretty
good description in
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support
the case, Spark needs to leverage the API
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
which returns the indexes of all rows (note the difference between rows and
values: for flat schema there is no difference between the two, but for nested
schema they're different) after filtering within a Parquet row group. In
addition, because there are gaps between pages, we'll need to know what is the
index for the first row in a page, so we can compare indexes of values (rows)
from a page with the row indexes mentioned above. This is provided by the
`DataPage.getFirstRow
Index` method.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/**
+ * Helper class to store intermediate state while reading a Parquet column
chunk.
+ */
+final class ParquetReadState {
Review comment:
Opened #33006
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]