[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17704971#comment-17704971 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1483892749 > @parthchandra Do you have time to resolve the conflicts? I think it would be nice to be included in the next release. Done > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17704399#comment-17704399 ] ASF GitHub Bot commented on PARQUET-2149: - wgtmac commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1482128840 @parthchandra Do you have time to resolve the conflicts? I think it would be nice to be included in the next release. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17704398#comment-17704398 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1482127933 > Sorry to interrupt, just wondering if this PR can work with [S3AsyncClient](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3AsyncClient.html) by any chance? @parthchandra > > Thanks! This PR uses the hdfs interface to s3 (s3a) if the url is s3a. I don't think that the s3a implementation uses any of the implementations of S3AsyncClient. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17704031#comment-17704031 ] ASF GitHub Bot commented on PARQUET-2149: - hazelnutsgz commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1480892881 Sorry to interrupt, just wondering if this PR can work with [S3AsyncClient](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3AsyncClient.html) by any chance? @parthchandra > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694116#comment-17694116 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1446796605 Also try a query like ``` select SUM(length(IFNULL(ss_sold_date_sk, ' '))), SUM(length(IFNULL(ss_sold_time_sk, ' '))), SUM(length(IFNULL(ss_item_sk, ' '))), SUM(length(IFNULL(ss_customer_sk, ' '))), SUM(length(IFNULL(ss_cdemo_sk, ' '))), SUM(length(IFNULL(ss_hdemo_sk, ' '))), SUM(length(IFNULL(ss_addr_sk, ' '))), SUM(length(IFNULL(ss_store_sk, ' '))), SUM(length(IFNULL(ss_promo_sk, ' '))), SUM(length(IFNULL(ss_ticket_number, ' '))), SUM(ss_quantity), SUM(ss_wholesale_Cost), SUM(ss_list_price ), SUM(ss_sales_price), SUM(ss_ext_discount_amt), SUM(ss_ext_sales_price), SUM(ss_ext_wholesale_cost), SUM(ss_ext_list_price), SUM(ss_ext_tax), SUM(ss_coupon_amt), SUM(ss_net_paid), SUM(ss_net_paid_inc_tax), SUM(ss_net_profit) from store_sales ``` which avoids the expensive sort > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694065#comment-17694065 ] ASF GitHub Bot commented on PARQUET-2149: - whcdjj commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1446487715 > Looks correct to me. Couple of questions, are you running this on a cluster or on local system? Also, is the data on SSD's? If you are on a single machine, there might not be enough CPU and the async threads may be contending for time with the processing threads. We'll need some profiling info to get a better diagnosis. Also, with SSD's, reads are so fast from the file system that the async feature might show very little improvement. You could turn on the debug logging level for FilePageReader and AsyncMultibufferInputStream. We can continue this in a thread outside this PR. OK,I will test with the debug logging for more details these days. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693554#comment-17693554 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1445230916 Looks correct to me. Couple of questions, are you running this on a cluster or on local system? Also, is the data on SSD's? If you are on a single machine, there might not be enough CPU and the async threads may be contending for time with the processing threads. We'll need some profiling info to get a better diagnosis. Also, with SSD's, reads are so fast from the file system that the async feature might show very little improvement. You could turn on the debug logging level for FilePageReader and AsyncMultibufferInputStream. We can continue this in a thread outside this PR. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693437#comment-17693437 ] ASF GitHub Bot commented on PARQUET-2149: - whcdjj commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1445021515 > My test is spark.sql("select * from store_sales order by ss_customer_sk limit 10"), store_sales is table of 1TB TP-CDS. Parquet-io and parquet-process threads is hardcoded given like this in ParquetReadOptions.java public static class Builder { protected ExecutorService ioThreadPool = Executors.newFixedThreadPool(4); protected ExecutorService processThreadPool = Executors.newFixedThreadPool(4); } I also take the follow test with local filesystem using 100GB TP-CDS store_sales table,and I see there is a degradation with async io feature. test("parquet reader select") { val sc = SparkSession.builder().master("local[4]").getOrCreate() val df = sc.read.parquet("file:///D:\\work\\test\\tpcds\\store_sales") df.createOrReplaceTempView("table") val start = System.currentTimeMillis() sc.sql("select * from table order by ss_customer_sk limit 10").show() val end = System.currentTimeMillis() System.out.println("time: " + (end - start)) } without this feature -> time: 7240 with this feature -> time: 19923 Threads are as expected  What process did I go wrong and can you show me the correct way to use this feature? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693284#comment-17693284 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1444106912 > Hi, I am very interested in this optimization and just have some questions when testing in a cluster with 4nodes/96 cores using spark3.1. Unfortunately, I see little improvement. You're likely to see improvement in cases where file i/o is the bottleneck. Most TPC-DS queries are join heavy and you will see little improvement there. You might do better with TPC-H. > I am confused than whether it is necessary to keep spark.sql.parquet.enableVectorizedReader = false in spark when testing with spark 3.1 and how can I set the parquet buffer size. It's probably best to keep the parquet (read) buffer size untouched. You should keep `spark.sql.parquet.enableVectorizedReader = true` irrespective of this. This feature improves I/O speed of reading raw data. The Spark vectorized reader kicks in after data is read from storage and converts the raw data into Spark's internal columnar representation and is faster than the row based version. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693023#comment-17693023 ] ASF GitHub Bot commented on PARQUET-2149: - whcdjj commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1442878000 Hi, I am very interested in this optimization and just have some questiones when testing in a cluster with 4nodes/96 cores using spark3.1. Unfortunately, I see little improvement. I am confused than whether it is neccessary to keep spark.sql.parquet.enableVectorizedReader = false in spark when testing with spark 3.2 and how can i set the parquet buffer size. Sincerely ask for advice @parthchandra > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682807#comment-17682807 ] ASF GitHub Bot commented on PARQUET-2149: - wgtmac commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1411338630 > @kazuyukitanimura @steveloughran @kbendick @ggershinsky @wgtmac @theosib-amazon Do you still have comments? It looks good to me. Please feel free to merge as you see fit. @shangxinli > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643502#comment-17643502 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1039934636 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue> readFutures; + private boolean closed = false; + + private LongAdder totalTimeBlocked = new LongAdder(); + private LongAdder totalCountBlocked = new LongAdder(); + private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + fetchIndex++; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +checkState(); +// hack: parent constructor can call this method before this class is fully initialized. +// Just return without doing anything. +if (readFutures == null) { + return false; +} +if (readIndex < buffers.size()) { + long start = System.nanoTime(); + try { +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC (next): Getting next buffer"); +} +Future future = readFutures.take(); +future.get(); +long timeSpent = System.nanoTime() - start; +totalCountBlocked.add(1); +totalTimeBlocked.add(timeSpent); +maxTimeBlocked.accumulate(timeSpent); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, timeSpent); +} + } catch (Exception e) { +if (e instanceof InterruptedException) { +
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642878#comment-17642878 ] ASF GitHub Bot commented on PARQUET-2149: - shangxinli commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1336203668 @kazuyukitanimura @steveloughran @kbendick @ggershinsky @wgtmac @theosib-amazon Do you still have comments? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642875#comment-17642875 ] ASF GitHub Bot commented on PARQUET-2149: - shangxinli commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038808056 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue> readFutures; + private boolean closed = false; + + private LongAdder totalTimeBlocked = new LongAdder(); + private LongAdder totalCountBlocked = new LongAdder(); + private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + fetchIndex++; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +checkState(); +// hack: parent constructor can call this method before this class is fully initialized. Review Comment: I see the comment with 'hack'. What is the proper implementation? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642873#comment-17642873 ] ASF GitHub Bot commented on PARQUET-2149: - shangxinli commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038807754 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue> readFutures; + private boolean closed = false; + + private LongAdder totalTimeBlocked = new LongAdder(); + private LongAdder totalCountBlocked = new LongAdder(); + private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + fetchIndex++; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +checkState(); +// hack: parent constructor can call this method before this class is fully initialized. +// Just return without doing anything. +if (readFutures == null) { + return false; +} +if (readIndex < buffers.size()) { + long start = System.nanoTime(); + try { +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC (next): Getting next buffer"); +} +Future future = readFutures.take(); +future.get(); +long timeSpent = System.nanoTime() - start; +totalCountBlocked.add(1); +totalTimeBlocked.add(timeSpent); +maxTimeBlocked.accumulate(timeSpent); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, timeSpent); +} + } catch (Exception e) { +if (e instanceof InterruptedException) { +
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642872#comment-17642872 ] ASF GitHub Bot commented on PARQUET-2149: - shangxinli commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038807754 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue> readFutures; + private boolean closed = false; + + private LongAdder totalTimeBlocked = new LongAdder(); + private LongAdder totalCountBlocked = new LongAdder(); + private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + fetchIndex++; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +checkState(); +// hack: parent constructor can call this method before this class is fully initialized. +// Just return without doing anything. +if (readFutures == null) { + return false; +} +if (readIndex < buffers.size()) { + long start = System.nanoTime(); + try { +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC (next): Getting next buffer"); +} +Future future = readFutures.take(); +future.get(); +long timeSpent = System.nanoTime() - start; +totalCountBlocked.add(1); +totalTimeBlocked.add(timeSpent); +maxTimeBlocked.accumulate(timeSpent); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, timeSpent); +} + } catch (Exception e) { +if (e instanceof InterruptedException) { +
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638794#comment-17638794 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1327957394 @wgtmac Thank you! > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638053#comment-17638053 ] ASF GitHub Bot commented on PARQUET-2149: - wgtmac commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1325873832 > Updated. The thread pools are now set thru ParquetReadOptions Thanks for addressing this! I agree that adopting Hadoop vectored IO is an orthogonal topic and can be a separate patch when the API is ready. So I believe this patch is complete and have voted +1. Any improvement can be added via follow-up patches. I may request other maintainers for a final approval. @shangxinli > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637996#comment-17637996 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1325768325 Updated. The thread pools are now set thru ParquetReadOptions > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637935#comment-17637935 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1030813744 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java: ## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.BlockCipher.Decryptor; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.hadoop.ParquetFileReader.Chunk; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the reading of a single page. + */ +public class FilePageReader implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(FilePageReader.class); + + private final ParquetFileReader parquetFileReader; + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int currentBlock, +Decryptor headerBlockDecryptor, +Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, +BytesInputDecompressor decompressor + ) { +this.parquetFileReader = parquetFileReader; +this.chunk = chunk; +this.currentBlock = currentBlock; +this.headerBlockDecryptor = headerBlockDecryptor; +this.pageBlockDecryptor = pageBlockDecryptor; +this.aadPrefix = aadPrefix; +this.rowGroupOrdinal = rowGroupOrdinal; +this.columnOrdinal = columnOrdinal; +this.decompressor = decompressor; + +this.type =
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637846#comment-17637846 ] ASF GitHub Bot commented on PARQUET-2149: - wgtmac commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1030605835 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java: ## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.BlockCipher.Decryptor; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.hadoop.ParquetFileReader.Chunk; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the reading of a single page. + */ +public class FilePageReader implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(FilePageReader.class); + + private final ParquetFileReader parquetFileReader; + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + //state + private final LinkedBlockingDeque> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int currentBlock, +Decryptor headerBlockDecryptor, +Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, +BytesInputDecompressor decompressor + ) { +this.parquetFileReader = parquetFileReader; +this.chunk = chunk; +this.currentBlock = currentBlock; +this.headerBlockDecryptor = headerBlockDecryptor; +this.pageBlockDecryptor = pageBlockDecryptor; +this.aadPrefix = aadPrefix; +this.rowGroupOrdinal = rowGroupOrdinal; +this.columnOrdinal = columnOrdinal; +this.decompressor = decompressor; + +this.type =
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636956#comment-17636956 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1322881480 Let me verify that merging the thread pools is not dangerous and post an update. After that if be great if you can take it. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636711#comment-17636711 ] ASF GitHub Bot commented on PARQUET-2149: - wgtmac commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1322172984 > > > IMO, switching `ioThreadPool` and `processThreadPool` the reader instance level will make it more flexible. > > > > > > I've changed the thread pool so that it is not initialized by default but I left them as static members. Ideally, there should be a single IO thread pool that handles all the IO for a process and the size of the pool is determined by the bandwidthof the underlying storage system. Making them per instance is not an issue though. The calling code can decide to set the same thread pool for all instances and achieve the same result. Let me update this. > > Also, any changes you want to make are fine with me, and the help is certainly appreciated ! > > I'm thinking of merging the thread pools into a single `ioThreadPool` and making it settable thru `ParquetReadOptions` (like the allocator is). The work being done by the `processThreadPool` is rather small and maybe we can do away with it. Adding the pool via `ParquetReadOptions` makes it easier to use with `ParquetReader` (used a lot in unit tests). WDYT? Sorry for my late reply. Setting the thread pools via `ParquetReadOptions` is a good idea and that is exactly the way I want to do them away with static members. Merging `ioThreadPool` and `processThreadPool` into a single pool should work if the tasks in the `processThreadPool` do not wait for the return of tasks in the `ioThreadPool`. I will look into the detail later. BTW, I don't have the permission to directly update your PR in place as I am not yet a maintainer of the repo. I may need to open a new one by copying what you have done here and add you as a co-author. WDYT? If that sounds good to you, I can proceed. @parthchandra > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635582#comment-17635582 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1319411064 > > IMO, switching `ioThreadPool` and `processThreadPool` the reader instance level will make it more flexible. > > I've changed the thread pool so that it is not initialized by default but I left them as static members. Ideally, there should be a single IO thread pool that handles all the IO for a process and the size of the pool is determined by the bandwidthof the underlying storage system. Making them per instance is not an issue though. The calling code can decide to set the same thread pool for all instances and achieve the same result. Let me update this. > > Also, any changes you want to make are fine with me, and the help is certainly appreciated ! I'm thinking of merging the thread pools into a single ioThreadPool and making the ioThreadPool settable thru `ParquetReadOptions` (like the allocator is). The work being done by the processThreadPool is rather small and maybe we can do away with it. Adding the pool via `ParquetReadOptions` makes it easier to use with `ParquetReader` (used a lot in unit tests). WDYT? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635497#comment-17635497 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1319040697 @wgtmac thank you for looking at this. I don't have any more TODOs on this PR. > * Adopt the incoming Hadoop vectored io api. This should be part of another PR. There is a draft PR (#999 ) open for this. Once that is merged in, I can revisit the async I/O code and incorporate the vectored io api. In other experiments I have seen that async io gives better results over slower networks. With faster network connections, as is the case where we are reading from S3 within an AWS environment, reading in parallel (as the vector io api does), starts to give better results. I believe, that both should be available as options. > * Benchmark against remote object stores from different cloud providers. The numbers I posted earlier were for reading from AWS/S3 over a 1 Gbps line. Reading from within AWS shows lesser improvement. I don't have an account with other cloud providers. Any help here would be appreciated. > IMO, switching `ioThreadPool` and `processThreadPool` the reader instance level will make it more flexible. I've changed the thread pool so that it is not initialized by default but I left them as static members. Ideally, there should be a single IO thread pool that handles all the IO for a process and the size of the pool is determined by the bandwidthof the underlying storage system. Making them per instance is not an issue though. The calling code can decide to set the same thread pool for all instances and achieve the same result. Let me update this. Also, any changes you want to make are fine with me, and the help is certainly appreciated ! > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635445#comment-17635445 ] ASF GitHub Bot commented on PARQUET-2149: - wgtmac commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1318904541 It looks like this PR is complete and all comments are addressed except some outstanding ones: - Adopt the incoming Hadoop vectored io api. - Benchmark against remote object stores from different cloud providers. IMO, switching `ioThreadPool` and `processThreadPool` the reader instance level will make it more flexible. @parthchandra Do you have any TODO list for this PR? If you are busy with some other stuff, I can continue to work on it because it is really a nice improvement. cc @shangxinli > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580267#comment-17580267 ] ASF GitHub Bot commented on PARQUET-2149: - ggershinsky commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r946662874 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); + + // Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool + // to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService ioThreadPool = Executors.newCachedThreadPool( +r -> new Thread(r, "parquet-io")); + + // Thread pool to process pages for multiple columns in parallel. Applications should call + // setAsyncProcessThreadPool to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService processThreadPool = Executors.newCachedThreadPool( Review Comment: not sure; looks like many tests use copy/paste, rather than extension.. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17576056#comment-17576056 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r927128065 ## parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java: ## @@ -61,9 +65,10 @@ private HadoopReadOptions(boolean useSignedStringMinMax, Configuration conf, FileDecryptionProperties fileDecryptionProperties) { super( -useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, -usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, codecFactory, allocator, -maxAllocationSize, properties, fileDecryptionProperties + useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, Review Comment: That's how Intellij formatted it after I added the new parameters. Added them back. ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } + private boolean isAsyncIOReaderEnabled(){ +if (options.isAsyncIOReaderEnabled() ) { + if (ioThreadPool != null) { +return true; + } else { +LOG.warn("Parquet async IO is configured but the IO thread pool has not been " + + "initialized. Configuration is being ignored"); + } +} +return false; + } + + private boolean isParallelColumnReaderEnabled(){ +if (options.isParallelColumnReaderEnabled() ) { + if (processThreadPool != null) { +return true; + } else { +LOG.warn("Parallel column reading is configured but the process thread pool has " + Review Comment: Ditto ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -1455,6 +1578,8 @@ protected PageHeader readPageHeader() throws IOException { } protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, byte[] pageHeaderAAD) throws IOException { + String mode = (isAsyncIOReaderEnabled())? "ASYNC":"SYNC"; Review Comment: Done ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + +@Override +public String toString() { + return "ConsecutivePartList{" + +"offset=" + offset + +", length=" + length + +", chunks=" + chunks + +'}'; +} } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { +private final Chunk chunk; +private final int currentBlock; +private final BlockCipher.Decryptor headerBlockDecryptor; +private final BlockCipher.Decryptor pageBlockDecryptor; +private final byte[] aadPrefix; +private final int rowGroupOrdinal; +private final int columnOrdinal; + +//state +private final LinkedBlockingDeque> pagesInChunk = new LinkedBlockingDeque<>(); +private DictionaryPage dictionaryPage = null; +private int pageIndex = 0; +private long valuesCountReadSoFar = 0; +private int dataPageCountReadSoFar = 0; + +// derived +private final PrimitiveType type; +private final byte[] dataPageAAD; +private final byte[] dictionaryPageAAD; +private byte[] dataPageHeaderAAD = null; + +private final BytesInputDecompressor decompressor; + +private final ConcurrentLinkedQueue> readFutures = new ConcurrentLinkedQueue<>(); + +private final LongAdder totalTimeReadOnePage = new LongAdder(); +private final LongAdder totalCountReadOnePage = new LongAdder(); +private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); +private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); +private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); +private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + +public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal =
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557430#comment-17557430 ] ASF GitHub Bot commented on PARQUET-2149: - ggershinsky commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903697361 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + +@Override +public String toString() { + return "ConsecutivePartList{" + +"offset=" + offset + +", length=" + length + +", chunks=" + chunks + +'}'; +} } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { +private final Chunk chunk; +private final int currentBlock; +private final BlockCipher.Decryptor headerBlockDecryptor; +private final BlockCipher.Decryptor pageBlockDecryptor; +private final byte[] aadPrefix; +private final int rowGroupOrdinal; +private final int columnOrdinal; + +//state +private final LinkedBlockingDeque> pagesInChunk = new LinkedBlockingDeque<>(); +private DictionaryPage dictionaryPage = null; +private int pageIndex = 0; +private long valuesCountReadSoFar = 0; +private int dataPageCountReadSoFar = 0; + +// derived +private final PrimitiveType type; +private final byte[] dataPageAAD; +private final byte[] dictionaryPageAAD; Review Comment: probably not needed > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557425#comment-17557425 ] ASF GitHub Bot commented on PARQUET-2149: - ggershinsky commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903693351 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + +@Override +public String toString() { + return "ConsecutivePartList{" + +"offset=" + offset + +", length=" + length + +", chunks=" + chunks + +'}'; +} } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { +private final Chunk chunk; +private final int currentBlock; +private final BlockCipher.Decryptor headerBlockDecryptor; +private final BlockCipher.Decryptor pageBlockDecryptor; +private final byte[] aadPrefix; +private final int rowGroupOrdinal; +private final int columnOrdinal; + +//state +private final LinkedBlockingDeque> pagesInChunk = new LinkedBlockingDeque<>(); +private DictionaryPage dictionaryPage = null; +private int pageIndex = 0; +private long valuesCountReadSoFar = 0; +private int dataPageCountReadSoFar = 0; + +// derived +private final PrimitiveType type; +private final byte[] dataPageAAD; +private final byte[] dictionaryPageAAD; +private byte[] dataPageHeaderAAD = null; + +private final BytesInputDecompressor decompressor; + +private final ConcurrentLinkedQueue> readFutures = new ConcurrentLinkedQueue<>(); + +private final LongAdder totalTimeReadOnePage = new LongAdder(); +private final LongAdder totalCountReadOnePage = new LongAdder(); +private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); +private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); +private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); +private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + +public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = getFileMetaData().getSchema() +.getType(chunk.descriptor.col.getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { +dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { +dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); +dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPage, Review Comment: Yep, the `dictionaryPageAAD` is not necessary here. This is a significant code change, more than just moving the current logic of ```java public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal) ``` I'll have a closer look at the details, but we need a unitest (proposed in my other comment) to make sure decryption works ok with async io and parallel column reading. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. >
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557375#comment-17557375 ] ASF GitHub Bot commented on PARQUET-2149: - ggershinsky commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903595526 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + +@Override +public String toString() { + return "ConsecutivePartList{" + +"offset=" + offset + +", length=" + length + +", chunks=" + chunks + +'}'; +} } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { Review Comment: maybe can also be separated from the ParquetFileReader, this is a chance to reduce the size of the latter :) > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557373#comment-17557373 ] ASF GitHub Bot commented on PARQUET-2149: - ggershinsky commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903592957 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); + + // Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool + // to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService ioThreadPool = Executors.newCachedThreadPool( +r -> new Thread(r, "parquet-io")); + + // Thread pool to process pages for multiple columns in parallel. Applications should call + // setAsyncProcessThreadPool to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService processThreadPool = Executors.newCachedThreadPool( Review Comment: given the comment "Default initialization is useful only for testing", maybe this can be moved to the tests? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557369#comment-17557369 ] ASF GitHub Bot commented on PARQUET-2149: - ggershinsky commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903469988 ## parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java: ## @@ -61,10 +61,7 @@ public InternalFileDecryptor(FileDecryptionProperties fileDecryptionProperties) private BlockCipher.Decryptor getThriftModuleDecryptor(byte[] columnKey) { if (null == columnKey) { // Decryptor with footer key - if (null == aesGcmDecryptorWithFooterKey) { -aesGcmDecryptorWithFooterKey = ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey); - } - return aesGcmDecryptorWithFooterKey; + return ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey); Review Comment: could you add a unitest of decryption with async io and parallel column reader, eg to the https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java#L507 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); + + // Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool + // to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService ioThreadPool = Executors.newCachedThreadPool( +r -> new Thread(r, "parquet-io")); + + // Thread pool to process pages for multiple columns in parallel. Applications should call + // setAsyncProcessThreadPool to initialize this with their own implementations. + // Default initialization is useful only for testing + public static ExecutorService processThreadPool = Executors.newCachedThreadPool( Review Comment: should we be creating thread pools if the Async IO and parallel column reading are not activated? (here and in the line 135) ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + +@Override +public String toString() { + return "ConsecutivePartList{" + +"offset=" + offset + +", length=" + length + +", chunks=" + chunks + +'}'; +} } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { Review Comment: we already have a PageReader (interface). Could you rename this class. ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue> readFutures; + private boolean closed = false; + + private LongAdder
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557307#comment-17557307 ] ASF GitHub Bot commented on PARQUET-2149: - ggershinsky commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903403821 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + +@Override +public String toString() { + return "ConsecutivePartList{" + +"offset=" + offset + +", length=" + length + +", chunks=" + chunks + +'}'; +} } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { +private final Chunk chunk; +private final int currentBlock; +private final BlockCipher.Decryptor headerBlockDecryptor; +private final BlockCipher.Decryptor pageBlockDecryptor; +private final byte[] aadPrefix; +private final int rowGroupOrdinal; +private final int columnOrdinal; + +//state +private final LinkedBlockingDeque> pagesInChunk = new LinkedBlockingDeque<>(); +private DictionaryPage dictionaryPage = null; +private int pageIndex = 0; +private long valuesCountReadSoFar = 0; +private int dataPageCountReadSoFar = 0; + +// derived +private final PrimitiveType type; +private final byte[] dataPageAAD; +private final byte[] dictionaryPageAAD; +private byte[] dataPageHeaderAAD = null; + +private final BytesInputDecompressor decompressor; + +private final ConcurrentLinkedQueue> readFutures = new ConcurrentLinkedQueue<>(); + +private final LongAdder totalTimeReadOnePage = new LongAdder(); +private final LongAdder totalCountReadOnePage = new LongAdder(); +private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); +private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); +private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); +private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + +public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = getFileMetaData().getSchema() +.getType(chunk.descriptor.col.getPath()).asPrimitiveType(); + + if (null != headerBlockDecryptor) { +dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { +dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, + columnOrdinal, 0); +dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPage, Review Comment: sure > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis >
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557305#comment-17557305 ] ASF GitHub Bot commented on PARQUET-2149: - ggershinsky commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r90337 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue> readFutures; + private boolean closed = false; + + private LongAdder totalTimeBlocked = new LongAdder(); + private LongAdder totalCountBlocked = new LongAdder(); + private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + fetchIndex++; +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +checkState(); +// hack: parent constructor can call this method before this class is fully initialized. +// Just return without doing anything. +if (readFutures == null) { + return false; +} +if (readIndex < buffers.size()) { + long start = System.nanoTime(); + try { +LOG.debug("ASYNC (next): Getting next buffer"); +Future future = readFutures.take(); +future.get(); +long timeSpent = System.nanoTime() - start; +totalCountBlocked.add(1); +totalTimeBlocked.add(timeSpent); +maxTimeBlocked.accumulate(timeSpent); +LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, timeSpent); Review Comment: should `if (LOG.isDebugEnabled()) {` be added here and in 118? This check is performed in the constructor (line 58); `nextBuffer()` is called with higher(/same) frequency. > Implement async
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556480#comment-17556480 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1160693399 @shangxinli Thank you for the review! I'll address these comments asap. I am reviewing the thread pool and its initialization. IMO, it is better if there is no default initialization of the pool and the calling application/framework does so explicitly. One side effect of the default initialization is that the pool is created unnecessarily even if async is off. Also, if an application, shades and includes another copy of the library (or transitively, many more), then one more thread pool gets created for every version of the library included. It is probably a better idea to allow the thread pool to be assigned as a per instance variable. The calling application can then decide to use a single pool for all instances or a new one per instance whichever use case is better for their performance. Finally, some large scale testing has revealed a possible resource leak. I'm looking into addressing it. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556467#comment-17556467 ] ASF GitHub Bot commented on PARQUET-2149: - steveloughran commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r901859862 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); Review Comment: dynamically changing the number of threads/buffer sizes/cache sizes is a recurrent source of pain in past work, as once you get to 128 core systems they often end up asking for too much of a limited resource > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556109#comment-17556109 ] ASF GitHub Bot commented on PARQUET-2149: - shangxinli commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r900994027 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -46,12 +46,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; Review Comment: I guess it is IDE does that but let's not use wildcard here ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static int numProcessors = Runtime.getRuntime().availableProcessors(); + + // Thread pool to read column chunk data from disk. Applications should call setAsyncIOThreadPool + // to initialize this with their own implementations. + // Default initialization is useful only for testing Review Comment: I understand we want applications to provide their own implementations, but can you share why we choose the cached thread pool instead of fixed in default? I kind of feel a lot of user scenarios of Parquet is with unpredictable execution times and we need better control over our program's resource consumption. ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -1387,8 +1489,13 @@ public void close() throws IOException { * result of the column-index based filtering when some pages might be skipped at reading. */ private class ChunkListBuilder { +// ChunkData is backed by either a list of buffers or a list of strams Review Comment: typo? streams? ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java: ## @@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx public long endPos() { return offset + length; } + +@Override +public String toString() { + return "ConsecutivePartList{" + +"offset=" + offset + +", length=" + length + +", chunks=" + chunks + +'}'; +} } + + /** + * Encapsulates the reading of a single page. + */ + public class PageReader implements Closeable { +private final Chunk chunk; +private final int currentBlock; +private final BlockCipher.Decryptor headerBlockDecryptor; +private final BlockCipher.Decryptor pageBlockDecryptor; +private final byte[] aadPrefix; +private final int rowGroupOrdinal; +private final int columnOrdinal; + +//state +private final LinkedBlockingDeque> pagesInChunk = new LinkedBlockingDeque<>(); +private DictionaryPage dictionaryPage = null; +private int pageIndex = 0; +private long valuesCountReadSoFar = 0; +private int dataPageCountReadSoFar = 0; + +// derived +private final PrimitiveType type; +private final byte[] dataPageAAD; +private final byte[] dictionaryPageAAD; +private byte[] dataPageHeaderAAD = null; + +private final BytesInputDecompressor decompressor; + +private final ConcurrentLinkedQueue> readFutures = new ConcurrentLinkedQueue<>(); + +private final LongAdder totalTimeReadOnePage = new LongAdder(); +private final LongAdder totalCountReadOnePage = new LongAdder(); +private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); +private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); +private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); +private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + +public PageReader(Chunk chunk, int currentBlock, Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal, + BytesInputDecompressor decompressor + ) { + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = getFileMetaData().getSchema() +.getType(chunk.descriptor.col.getPath()).asPrimitiveType(); + + if (null !=
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555935#comment-17555935 ] ASF GitHub Bot commented on PARQUET-2149: - shangxinli commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r900993899 ## parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java: ## @@ -61,9 +65,10 @@ private HadoopReadOptions(boolean useSignedStringMinMax, Configuration conf, FileDecryptionProperties fileDecryptionProperties) { super( -useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, -usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, codecFactory, allocator, -maxAllocationSize, properties, fileDecryptionProperties + useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, Review Comment: it seems two spaces were removed. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17553586#comment-17553586 ] ASF GitHub Bot commented on PARQUET-2149: - steveloughran commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1153924743 (i could of course add those probes into the shim class, so at least that access of internals was in one place) > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17553585#comment-17553585 ] ASF GitHub Bot commented on PARQUET-2149: - steveloughran commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1153923501 bq. perhaps check if the ByteBufferReadable interface is implemented in the stream? The requirement for the `hasCapability("in:readbytebuffer")` to return true postdates the API; there's no way to be confident that if the probe returns false (or hasPathCapability() isn't available) that the method *isn't actually there* see #951 for a design which will trust a `true` response, falling back to looking at the wrapped stream. Note that as it calls getWrapped() it is calling methods tagged LimitedPrivate. it should really do that...at the very least hadoop needs a PR saying "we need to do this because..." and that tag can be changed > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17552370#comment-17552370 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1151467274 Sounds good. Also, perhaps check if the ByteBufferReadable interface is implemented in the stream? > ByteBufferReadable will raise UnsupportedException if not found, there is a check for it > https://github.com/steveloughran/fs-api-shim/blob/main/fs-api-shim-library/src/main/java/org/apache/hadoop/fs/shim/FSDataInputStreamShim.java > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17552350#comment-17552350 ] ASF GitHub Bot commented on PARQUET-2149: - steveloughran commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1151417126 I've started work on a fs-api-shim library, with the goal of "apps compile against hadoop 3.2.0 can get access to the 3.3 and 3.4 APIs when available either with transparent fallback (openFile()) or ability to probe the API before trying to invoke https://github.com/steveloughran/fs-api-shim openfile takes the seek & status params, falls back to open() : https://github.com/steveloughran/fs-api-shim/blob/main/fs-api-shim-library/src/main/java/org/apache/hadoop/fs/shim/FileSystemShim.java#L87 ByteBufferReadable will raise UnsupportedException if not found, there is a check for it https://github.com/steveloughran/fs-api-shim/blob/main/fs-api-shim-library/src/main/java/org/apache/hadoop/fs/shim/FSDataInputStreamShim.java Vector IO SHALL be available the same way Adopt 3.2.0 then and we will help give the library the ability to use the newer api calls, even stuff not yet shipped in apache releases. (I want to release this as an asf artifact with oversight by hadoop project. lets us maintain it) > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17552331#comment-17552331 ] ASF GitHub Bot commented on PARQUET-2149: - steveloughran commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r893760667 ## parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * A bare minimum implementation of a {@link java.io.SequenceInputStream} that wraps an + * ordered collection of ByteBufferInputStreams. + * + * This class, as implemented, is intended only for a specific use in the ParquetFileReader and + * throws {@link UnsupportedOperationException} in unimplemented methods to catch any unintended + * use in other cases. + * + * Even thought this class is derived from ByteBufferInputStream it explicitly does not support any + * byte buffer related methods like slice. It does, however support sliceBuffers which is a + * curious case of reading data from underlying streams + * + * Even though this class changes the state of the underlying streams (by reading from them) + * it does not own them and so the close method does not close the streams. To avoid resource + * leaks the calling code should close the underlying streams + */ +public class SequenceByteBufferInputStream extends ByteBufferInputStream { + + Collection collection; + Iterator iterator; + ByteBufferInputStream current; + long position = 0; + + @Override + public String toString() { +return "SequenceByteBufferInputStream{" + + "collection=" + collection + + ", current=" + current + + ", position=" + position + + '}'; + } + + public SequenceByteBufferInputStream(Collection collection) { +this.collection = collection; +iterator = collection.iterator(); +current = iterator.hasNext() ? iterator.next() : null; +if (current == null) { + throw new UnsupportedOperationException( +"Initializing SequenceByteBufferInputStream with an empty collection is not supported"); +} + } + + @Override + public long position() { +return position; + } + + @Override + public int read(ByteBuffer out) { Review Comment: good to know. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader >
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17542147#comment-17542147 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1137610598 That batch reader in Presto reminds me of some of the experimental changes I made in Trino. I modified PrimitiveColumnReader to work out how many of each data item it needs to read from the data source and requests all of them at once in an array. This doubled the performance of some TPCDS queries. This is why I have array access methods planned for ParquetMR. (https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing) Requesting data in bulk saves a lot on function call overhead for each data item. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17542117#comment-17542117 ] ASF GitHub Bot commented on PARQUET-2149: - sunchao commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1137568979 > and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be the baseline. +1 on upgrading to 3.3.0, although currently parquet is using 2.10.1 as a provided dependency and we need to make sure it continues to work with hadoop 2.x > It may be because I was using Spark's vectorized parquet decoding which is an order or magnitude faster than parquet library's row by row decoding (see [Spark benchmarks](https://github.com/apache/spark/blob/master/sql/core/benchmarks/DataSourceReadBenchmark-results.txt)). If trino is not doing vectorized decoding (I took a very quick look and I don't think it is), I would suggest you can look into that next. All the cool kids are doing it. Presto already has a [batch reader](https://github.com/prestodb/presto/tree/master/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader) but seems the feature is not in Trino yet. The batch reader did help a lot to reduce the CPU load. See [the slides](https://github.com/prestodb/presto/wiki/files/presto-meetup-oct-2019/uber.pdf). > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541757#comment-17541757 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136552664 > This is interesting, because when I did profiling of Trino, I found that although I/O (from S3, over the network no less) was significant, even more time was spent in compute. Maybe you're getting improved performance because you're increasing _parallelism_ between I/O and compute. It may be because I was using Spark's vectorized parquet decoding which is an order or magnitude faster than parquet library's row by row decoding (see [Spark benchmarks](https://github.com/apache/spark/blob/master/sql/core/benchmarks/DataSourceReadBenchmark-results.txt)). If trino is not doing vectorized decoding (I took a very quick look and I don't think it is), I would suggest you can look into that next. All the cool kids are doing it. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541754#comment-17541754 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136542868 > So it sounds like you're optimizing one layer of processing, and I'm optimizing the next layer up, and it's kindof a coincidence that we're touching some of the same classes just because code reuse has been possible here. Yeah, kind of cool :) > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541752#comment-17541752 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136531385 BTW, adding more tests for the InputStream implementations. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541750#comment-17541750 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136528207 > > Is byte (and arrays and buffers of bytes) the only datatype you support? My PR is optimizing code paths that pull ints, longs, and other sizes out of the data buffers. Are those not necessary for any of the situations where you're using an async buffer? > > The input stream API is generally unaware of the datatypes of its contents and so those are the only apis I use. The other reason is that the ParquetFileReader returns Pages which basically contain metadata and ByteBuffers of _compressed_ data. The decompression and decoding into types comes much later in a downstream thread. > > For your PR, I don't think the AsyncMultibufferInputStream is every going to be in play in the paths you're optimizing. But just in case it is, your type aware methods will work as is because AsyncMultibufferInputStream is derived from MultiBufferInputStream and will inherit those methods. I'm still learning Parquet's structure. So it sounds to me like these buffer input streams are used twice. Once to get data and decompress it, and then once again to decode it into data structures. Is that correct? So it sounds like you're optimizing one layer of processing, and I'm optimizing the next layer up, and it's kindof a coincidence that we're touching some of the same classes just because code reuse has been possible here. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541749#comment-17541749 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136528026 > Latency is the killer; in an HTTP request you want read enough but not discard data or break an http connection if the client suddenly does a seek() or readFully() somewhere else. file listings, existence checks etc. > > That'd be great. now, if you could also handle requesting different columns in parallel and processing them out of order. I do. The Parquet file reader api that reads row groups in sync mode reads all columns in sequence. In async mode, it fires off a task for every column blocking only to read the first page of every column before returning. This part also uses a different thread pool from the IO tasks so that IO tasks never wait because there are no available threads in the thread pool. > > be good to think about vectored IO. I think I know how to integrate this PR with the vectored IO, but this is only after a cursory look. > > and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be the baseline. Who can drive this (presumably) non-trivial change? I myself have no karma points :( > just sketched out my thoughts on this. I've played with some of this in my own branch. I think the next step would be for me to look at the benchmark code to make it targetable elsewhere. > > https://docs.google.com/document/d/1y9oOSYbI6fFt547zcQJ0BD8VgvJWdyHBveaiCHzk79k/ This is great. I now have much more context of where you are coming from (and going to) ! > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541748#comment-17541748 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136526711 This is interesting, because when I did profiling of Trino, I found that although I/O (from S3, over the network no less) was significant, even more time was spent in compute. Maybe you're getting improved performance because you're increasing *parallelism* between I/O and compute. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541725#comment-17541725 ] ASF GitHub Bot commented on PARQUET-2149: - steveloughran commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136465506 > At this point the bottlenecks in parquet begin to move towards decompression and decoding but IO remains the slowest link in the chain. Latency is the killer; in an HTTP request you want read enough but not discard data or break an http connection if the client suddenly does a seek() or readFully() somewhere else. file listings, existence checks etc. > One thing we get with my PR is that the ParquetFileReader had assumptions built in that all data must be read before downstream can proceed. Some of my changes are related to removing these assumptions and ensuring that downstream processing does not block until an entire column is read so we get efficient pipelining. That'd be great. now, if you could also handle requesting different columns in parallel and processing them out of order. > What does the 128 MB block mean? Is this the amount prefetched for a stream? The read API does not block until the entire block is filled, I presume. this was the abfs client set to do four GET requests of 128MB each. this would be awful for columns stores where smaller ranges are often requested/processed before another seek is made, but quite often parquet does do more back to back reads than just one read/readFully request > With my PR, parquet IO is reading 8MB at a time (default) and downstream is processing 1MB at a time (default) and several such streams (one per column) are in progress at the same time. Hopefully, this read pattern would work with the prefetch. be good to think about vectored IO. and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be the baseline. just sketched out my thoughts on this. I've played with some of this in my own branch. I think the next step would be for me to look at the benchmark code to make it targetable elsewhere. https://docs.google.com/document/d/1y9oOSYbI6fFt547zcQJ0BD8VgvJWdyHBveaiCHzk79k/ > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541717#comment-17541717 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136439133 > Is byte (and arrays and buffers of bytes) the only datatype you support? My PR is optimizing code paths that pull ints, longs, and other sizes out of the data buffers. Are those not necessary for any of the situations where you're using an async buffer? The input stream API is generally unaware of the datatypes of its contents and so those are the only apis I use. The other reason is that the ParquetFileReader returns Pages which basically contain metadata and ByteBuffers of _compressed_ data. The decompression and decoding into types comes much later in a downstream thread. For your PR, I don't think the AsyncMultibufferInputStream is every going to be in play in the paths you're optimizing. But just in case it is, your type aware methods will work as is because AsyncMultibufferInputStream is derived from MultiBufferInputStream and will inherit those methods. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541699#comment-17541699 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136427603 > thanks., that means you are current with all shipping improvments. the main one extra is to use openFile(), passing in length and requesting randomio. this guarantees ranged GET requests and cuts the initial HEAD probe for existence/size of file. By `openFile()` do you mean `FileSystem.openFileWithOptions(Path,OpenFileParameters)`? While looking I realized the Parquet builds with a [much older version of hadoop](https://github.com/apache/parquet-mr/blob/a2da156b251d13bce1fa81eb95b555da04880bc1/pom.xml#L79) > > > have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there? > > > No I have not. Would love help from anyone in the community with access to these. I only have access to S3. > > that I have. FWIW, with the right tuning of abfs prefetch (4 threads, 128 MB blocks) i can get full FTTH link rate from a remote store; 700 mbit/s . that's to the base station. once you add wifi the bottlenecks move. Wow! That is nearly as fast as local HDD. At this point the bottlenecks in parquet begin to move towards decompression and decoding but IO remains the slowest link in the chain. One thing we get with my PR is that the ParquetFileReader had assumptions built in that all data must be read before downstream can proceed. Some of my changes are related to removing these assumptions and ensuring that downstream processing does not block until an entire column is read so we get efficient pipelining. What does the 128 MB block mean? Is this the amount prefetched for a stream? The read API does not block until the entire block is filled, I presume. With my PR, parquet IO is reading 8MB at a time (default) and downstream is processing 1MB at a time (default) and several such streams (one per column) are in progress at the same time. Hopefully, this read pattern would work with the prefetch. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541618#comment-17541618 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r880747716 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -1,14 +1,14 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * Review Comment: ack > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541600#comment-17541600 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136123313 Is byte (and arrays and buffers of bytes) the only datatype you support? My PR is optimizing code paths that pull ints, longs, and other sizes out of the data buffers. Are those not necessary for any of the situations where you're using an async buffer? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541592#comment-17541592 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r880691495 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -1,14 +1,14 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * Review Comment: It looks like you're running in to the same bug in IntelliJ as I am, where it makes whitespace changes without authorization. Would you mind commenting on this bug report that I filed? https://youtrack.jetbrains.com/issue/IDEA-293197/IntelliJ-makes-unauthorized-changes-to-whitespace-in-comments-wo > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541590#comment-17541590 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r880691495 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -1,14 +1,14 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * Review Comment: It looks like you're running in to the same bug in IntelliJ as I am, where it makes whitespace chances without authorization. Would you mind commenting on this bug report that I filed? https://youtrack.jetbrains.com/issue/IDEA-293197/IntelliJ-makes-unauthorized-changes-to-whitespace-in-comments-wo > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541359#comment-17541359 ] ASF GitHub Bot commented on PARQUET-2149: - steveloughran commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1135585289 > I was working with s3a > Spark 3.2.1 > Hadoop (Hadoop-aws) 3.3.2 > AWS SDK 1.11.655 thanks., that means you are current with all shipping improvments. the main one extra is to use openFile(), passing in length and requesting randomio. this guarantees ranged GET requests and cuts the initial HEAD probe for existence/size of file. >> have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there? > No I have not. Would love help from anyone in the community with access to these. I only have access to S3. that I have. FWIW, with the right tuning of abfs prefetch (4 threads, 128 MB blocks) i can get full FTTH link rate from a remote store; 700 mbit/s . that's to the base station. once you add wifi the bottlenecks move. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541250#comment-17541250 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1135366352 @steveloughran thank you very much for taking the time to review and provide feedback! > 1. whose s3 client was used for testing here -if the s3a one, which hadoop release? I was working with s3a - Spark 3.2.1 Hadoop (Hadoop-aws) 3.3.2 AWS SDK 1.11.655 > 2. the azure abfs and gcs connectors do async prefetching of the next block, but are simply assuming that code will read sequentially; if there is another seek/readFully to a new location, those prefetches will be abandoned. there is work in s3a to do prefetching here with caching, so as to reduce the penalty of backwards seeks. https://issues.apache.org/jira/browse/HADOOP-18028 I haven't worked with abfs or gcs. If the connectors do async pre-fetching, that would be great. Essentially, the time the Parquet reader would have to block in the file system API would reduce substantially. In such a case, we could turn the async reader on/off and rerun the benchmark to compare. From past experience with the MaprFS which had very aggressive read ahead in its hdfs client, I would still expect better parquet speeds. The fact that the prefetch is turned off when a seek occurs is usual behaviour, but we may see no benefit from the connector in that case. So a combination of async reader and async connector might end up being a great solution (maybe at a slightly greater CPU utilization). We would still have to do a benchmark to see the real effect. The async version in this PR takes care of the sequential read requirement by a) opening a new stream for each column and ensuring every column is read sequentially. Footers are read using a separate stream. Except for the footer, no other stream ever seeks to a new location. b) The amount of data to be read is predetermined so there is never a read ahead that is discarded. > > hadoop is adding a vectored IO api intended for libraries like orc and parquet to be able to use, where the application provides an unordered list of ranges, a bytebuffer supplier and gets back a list of futures to wait for. the base implementation simply reads using readFully APi. s3a (and later abfs) will do full async retrieval itself, using the http connection pool. https://issues.apache.org/jira/browse/HADOOP-18103 > > both vectored io and s3a prefetching will ship this summer in hadoop 3.4.0. i don't see this change conflicting with this, though they may obsolete a lot of it. Yes, I became aware of this recently. I'm discussing integration of these efforts in a separate channel. At the moment I see no conflict, but have yet to determine how much of this async work would need to be changed. I suspect we may be able to eliminate or vastly simplify `AsyncMultiBufferInputStream`. > have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there? No I have not. Would love help from anyone in the community with access to these. I only have access to S3. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader >
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541248#comment-17541248 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r879902468 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue> readFutures; + private boolean closed = false; + + private LongAdder totalTimeBlocked = new LongAdder(); + private LongAdder totalCountBlocked = new LongAdder(); + private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + long putStart = System.nanoTime(); + long putCompleted = System.nanoTime(); + LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}", Review Comment: It doesn't. Sorry, this got left behind after a cleanup. ## parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND,
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541021#comment-17541021 ] ASF GitHub Bot commented on PARQUET-2149: - steveloughran commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r879586630 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue> readFutures; + private boolean closed = false; + + private LongAdder totalTimeBlocked = new LongAdder(); + private LongAdder totalCountBlocked = new LongAdder(); + private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + long putStart = System.nanoTime(); + long putCompleted = System.nanoTime(); + LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}", Review Comment: how does this work? ## parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541016#comment-17541016 ] ASF GitHub Bot commented on PARQUET-2149: - steveloughran commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1134843705 1. whose s3 client was used for testing here -if the s3a one, which hadoop release? 2. the azure abfs and gcs connectors do async prefetching of the next block, but are simply assuming that code will read sequentially; if there is another seek/readFully to a new location, those prefetches will be abandoned. there is work in s3a to do prefetching here with caching, so as to reduce the penalty of backwards seeks. https://issues.apache.org/jira/browse/HADOOP-18028 hadoop is adding a vectored IO api intended for libraries like orc and parquet to be able to use, where the application provides an unordered list of ranges, a bytebuffer supplier and gets back a list of futures to wait for. the base implementation simply reads using readFully APi. s3a (and later abfs) will do full async retrieval itself, using the http connection pool. https://issues.apache.org/jira/browse/HADOOP-18103 both vectored io and s3a prefetching will ship this summer in hadoop 3.4.0. i don't see this change conflicting with this, though they may obsolete a lot of it. have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17540203#comment-17540203 ] ASF GitHub Bot commented on PARQUET-2149: - kbendick commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r878332111 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -37,4 +39,9 @@ public interface PageReader { * @return the next page in that chunk or null if after the last page */ DataPage readPage(); + + /** + * Close the page reader. By default it is no-op. + */ + default void close() throws IOException {} Review Comment: Cool. I know a lot of times I check if instances of closeable and then call close. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17540200#comment-17540200 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r877587287 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Iterator; Review Comment: Yeah. This and a bunch of other imports are not used. Removing them all. ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + final SeekableInputStream fileInputStream; + int fetchIndex = 0; + int readIndex = 0; + ExecutorService threadPool; + LinkedBlockingQueue> readFutures; + boolean closed = false; + + LongAdder totalTimeBlocked = new LongAdder(); + LongAdder totalCountBlocked = new LongAdder(); + LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); Review Comment: Done ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator; import java.util.Queue; +import java.util.concurrent.LinkedBlockingDeque; +import org.apache.parquet.ParquetRuntimeException; Review Comment: Removed > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539724#comment-17539724 ] ASF GitHub Bot commented on PARQUET-2149: - kazuyukitanimura commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r877368066 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator; import java.util.Queue; +import java.util.concurrent.LinkedBlockingDeque; +import org.apache.parquet.ParquetRuntimeException; Review Comment: Also I am not sure if `ParquetRuntimeException` is used... ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Iterator; Review Comment: I can't seem to find `Iterator` is used... Should we remove if it is the case? ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + final SeekableInputStream fileInputStream; + int fetchIndex = 0; + int readIndex = 0; + ExecutorService threadPool; + LinkedBlockingQueue> readFutures; + boolean closed = false; + + LongAdder totalTimeBlocked = new LongAdder(); + LongAdder totalCountBlocked = new LongAdder(); + LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); Review Comment: Does it make sense to change these to `private` if they are not accessed from other places? ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator; import java.util.Queue; Review Comment: I think `Queue` is no longer used with this change. Should we remove the import? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539121#comment-17539121 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130698923 @theosib-amazon I applied my PR on top of your PR, ran thru some tests using Spark, and hit no issues. (All unit tests passed as well). > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539119#comment-17539119 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876197602 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -37,4 +39,9 @@ public interface PageReader { * @return the next page in that chunk or null if after the last page */ DataPage readPage(); + + /** + * Close the page reader. By default it is no-op. + */ + default void close() throws IOException {} Review Comment: Sure. ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + final SeekableInputStream fileInputStream; + int fetchIndex = 0; + int readIndex = 0; + ExecutorService threadPool; + LinkedBlockingQueue> readFutures; + boolean closed = false; + Exception ioException; + + LongAdder totalTimeBlocked = new LongAdder(); + LongAdder totalCountBlocked = new LongAdder(); + LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} +synchronized (this) { + if (ioException != null) { +throw new RuntimeException(ioException); + } +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + long putStart = System.nanoTime(); + long putCompleted = System.nanoTime(); + LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}", +(putCompleted - putStart) / 1000.0); + fetchIndex++; +} catch (IOException e) { + // Save the exception so that the calling thread can check if something went wrong. + // checkState will throw an exception if the read task has failed. + synchronized(this) { +ioException = e; + } + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538991#comment-17538991 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130327546 > @parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O? Good point. `MultiBufferInputStream` is constructed using buffers that have been filled already. `AsyncMultiBufferInputStream` takes an input stream as a parameter in the constructor and performs the IO itself. In `ByteBufferInputStream` I added ``` public static ByteBufferInputStream wrapAsync(ExecutorService threadPool, SeekableInputStream fileInputStream, List buffers) { return new AsyncMultiBufferInputStream(threadPool, fileInputStream, buffers); } ``` > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538983#comment-17538983 ] ASF GitHub Bot commented on PARQUET-2149: - kbendick commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876172260 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + final SeekableInputStream fileInputStream; + int fetchIndex = 0; + int readIndex = 0; + ExecutorService threadPool; + LinkedBlockingQueue> readFutures; + boolean closed = false; + Exception ioException; + + LongAdder totalTimeBlocked = new LongAdder(); + LongAdder totalCountBlocked = new LongAdder(); + LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} +synchronized (this) { + if (ioException != null) { +throw new RuntimeException(ioException); + } +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + long putStart = System.nanoTime(); + long putCompleted = System.nanoTime(); + LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}", +(putCompleted - putStart) / 1000.0); + fetchIndex++; +} catch (IOException e) { + // Save the exception so that the calling thread can check if something went wrong. + // checkState will throw an exception if the read task has failed. + synchronized(this) { +ioException = e; + } + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +checkState(); +// hack: parent constructor can call this method before this class is fully initialized. +// Just return without doing anything. +if (readFutures == null) { + return false; +} Review Comment: Because `checkState` has synchronization, would it be safe to move the `checkState` before this somehow or add some kind of less expensive boolean check that we can set
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538979#comment-17538979 ] ASF GitHub Bot commented on PARQUET-2149: - kbendick commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876165378 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -37,4 +39,9 @@ public interface PageReader { * @return the next page in that chunk or null if after the last page */ DataPage readPage(); + + /** + * Close the page reader. By default it is no-op. + */ + default void close() throws IOException {} Review Comment: Should we add `Closeable` as an implemented interface? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538969#comment-17538969 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130275378 @parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538967#comment-17538967 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130270383 > @parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch. > https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing @theosib-amazon I read your document and went thru #960. It looks like for the most part, #960 and this PR and complement each other. The overlap I see is in the changes to `MultiBufferInputStream` where you have added the `readFully`, and `skipFully` APIs. The bulk of my changes for async IO are in a class derived from `MultiBufferInputStream` and the heart of the changes depends on overriding `MultiBufferInputStream.nextBuffer`. In `MultiBufferInputStream.nextBuffer` the assumption is that all the buffers have been read into. In `AsyncMultiBufferInputStream.nextBuffer` this assumption is removed and the call *blocks* only if the next required buffer has not been read into. Now, `skipFully` and `readFully` are potentially blocking calls because both call `nextBuffer` repeatedly if necessary. To gain maximum pipelining, you want to make calls to skipFully and readFully such that you never block for too long (or at all) in the call. You will get this if you are skipping or reading less than the number of bytes in a single buffer. This is generally the case as decompression and decoding is at the page level and that is smaller than the size of a single buffer. However, for your optimizations, you should be aware of this behaviour. From what I see, I don't think there will be a conflict. I'll pull in your PR and give it a deeper look. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538945#comment-17538945 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130229884 > Great effort! WIll have a look after the build succeed. @shangxinli I have no idea how to get the failed CI to pass. These failures appear to be in unrelated areas caused by some infra issues. Is there a way to trigger a rerun? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538934#comment-17538934 ] ASF GitHub Bot commented on PARQUET-2149: - dbtsai commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130214186 cc @rdblue @gszadovszky @ggershinsky > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538912#comment-17538912 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130176799 @parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch. https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538895#comment-17538895 ] ASF GitHub Bot commented on PARQUET-2149: - shangxinli commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130130378 Great effort! WIll have a look after the build succeed. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538495#comment-17538495 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1129363998 I have some numbers from an internal benchmark using Spark. I didn't see any benchmarks in the Parquet codebase that I could reuse. Here are the numbers from my own benchmark - - 10 runs, each run reads all columns from store_sales (the largest table) in the TPC-DS (100G) dataset `spark.sql("select * from store_sales")` - Sync reader with default 8MB buffer size, Async reader with 1MB buffer size (achieves better pipelining) - Run on Macbook Pro, reading from S3. Spark has 6 cores. - All times in seconds | Run | Async | Sync | Async (w/o outliers)| Sync (w/o outliers) | | ---:| ---:| ---:| ---:| ---:| |1| 84| 102| - | - | |2| 90| 366| 90| 366| |3| 78| 156| - | 156| |4| 84| 128| 84| - | |5| 108|402| - | - | |6| 90| 432| 90| - | |7| 84| 378| 84| 378| |8| 108|324| - | 324| |9| 90| 318| 90| 318| |10|90| 282| 90| 282| |Average| 90.6| 288.8| 88| 304| |Median| 90| 321| **90**| **321**| |StdDev| 9.98| 119. After removing the two highest and two lowest runs for each case, and taking the median value: Async: 90 sec Sync: 321 sec > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538363#comment-17538363 ] ASF GitHub Bot commented on PARQUET-2149: - dbtsai commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1129148390 @parthchandra do you have performance benchmark? Thanks > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538331#comment-17538331 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1129106516 Anyone know why the CI checks are failing with a SocketTimeout exception, and what to do to address this? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537813#comment-17537813 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra opened a new pull request, #968: URL: https://github.com/apache/parquet-mr/pull/968 ### Jira This PR addresses the following [PARQUET-2149](https://issues.apache.org/jira/browse/PARQUET-2149): Implement async IO for Parquet file reader ### Tests This PR adds the following unit tests AsyncMultiBufferInputStream.* TestMultipleWriteRead.testReadWriteAsync TestColumnChunkPageWriteStore.testAsync The PR is also tested by changing the default configuration to make all reads async and then ensuring all unit tests pass > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)