[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-06-06 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729708#comment-17729708
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

Fokko commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1578620658

   @steveloughran 駱 Looking forward to your talk!




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-04-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711588#comment-17711588
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1506058505

   FWIW the hadoop 3.3.5 vector io changes might make this PR redundant.




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-04-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711388#comment-17711388
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1505269164

   @hazelnutsgz hadoop 3.3.5 supports vector IO on an s3 stream; async parallel 
fetch of blocks, which also works on local fs (and with gcs, abfs TODO items). 
we see significant performance increases there. There's a PR for it, though as 
it is 3.3.5+ only, not merged in to asf parquet branches unless the move or we 
finish a shim library to offer (serialized) support for the api on older 
releases.




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-03-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17704971#comment-17704971
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1483892749

   > @parthchandra Do you have time to resolve the conflicts? I think it would 
be nice to be included in the next release.
   
   Done
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-03-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17704399#comment-17704399
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

wgtmac commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1482128840

   @parthchandra Do you have time to resolve the conflicts? I think it would be 
nice to be included in the next release.




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-03-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17704398#comment-17704398
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1482127933

   > Sorry to interrupt, just wondering if this PR can work with 
[S3AsyncClient](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3AsyncClient.html)
 by any chance? @parthchandra
   > 
   > Thanks!
   
   This PR uses the hdfs interface to s3 (s3a) if the url is s3a. I don't think 
that the s3a implementation uses any of the implementations of S3AsyncClient. 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-03-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17704031#comment-17704031
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

hazelnutsgz commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1480892881

   Sorry to interrupt, just wondering if this PR can work with 
[S3AsyncClient](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3AsyncClient.html)
  by any chance? @parthchandra 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-02-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694116#comment-17694116
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1446796605

   Also try a query like 
   ```
 select
   SUM(length(IFNULL(ss_sold_date_sk, ' '))),
   SUM(length(IFNULL(ss_sold_time_sk, ' '))),
   SUM(length(IFNULL(ss_item_sk, ' '))),
   SUM(length(IFNULL(ss_customer_sk, ' '))),
   SUM(length(IFNULL(ss_cdemo_sk, ' '))),
   SUM(length(IFNULL(ss_hdemo_sk, ' '))),
   SUM(length(IFNULL(ss_addr_sk, ' '))),
   SUM(length(IFNULL(ss_store_sk, ' '))),
   SUM(length(IFNULL(ss_promo_sk, ' '))),
   SUM(length(IFNULL(ss_ticket_number, ' '))),
   SUM(ss_quantity),
   SUM(ss_wholesale_Cost),
   SUM(ss_list_price ),
   SUM(ss_sales_price),
   SUM(ss_ext_discount_amt),
   SUM(ss_ext_sales_price),
   SUM(ss_ext_wholesale_cost),
   SUM(ss_ext_list_price),
   SUM(ss_ext_tax),
   SUM(ss_coupon_amt),
   SUM(ss_net_paid),
   SUM(ss_net_paid_inc_tax),
   SUM(ss_net_profit)
   from store_sales
   
   ```
   which avoids the expensive sort 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-02-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694065#comment-17694065
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

whcdjj commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1446487715

   > Looks correct to me. Couple of questions, are you running this on a 
cluster or on local system? Also, is the data on SSD's? If you are on a single 
machine, there might not be enough CPU and the async threads may be contending 
for time with the processing threads. We'll need some profiling info to get a 
better diagnosis. Also, with SSD's, reads are so fast from the file system that 
the async feature might show very little improvement. You could turn on the 
debug logging level for FilePageReader and AsyncMultibufferInputStream. We can 
continue this in a thread outside this PR.
   
   OK,I will test with the debug logging for more details these days.




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-02-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693554#comment-17693554
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1445230916

   Looks correct to me. Couple of questions, are you running this on a cluster 
or on local system? Also, is the data on SSD's? If you are on a single machine, 
there might not be enough CPU and the async threads may be contending for time 
with the processing threads. We'll need some profiling info to get a better 
diagnosis. Also, with SSD's, reads are so fast from the file system that the 
async feature might show very little improvement. 
   You could turn on the debug logging level for FilePageReader and 
AsyncMultibufferInputStream.
   We can continue this in a thread outside this PR. 
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-02-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693437#comment-17693437
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

whcdjj commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1445021515

   > My test is spark.sql("select * from store_sales order by ss_customer_sk 
limit 10"), store_sales is table of 1TB TP-CDS.
   
   Parquet-io  and parquet-process  threads is hardcoded given like this in 
ParquetReadOptions.java
   public static class Builder {
   protected ExecutorService ioThreadPool = Executors.newFixedThreadPool(4);
   protected ExecutorService processThreadPool = 
Executors.newFixedThreadPool(4);
   }
   
   I also take the follow test with local filesystem using 100GB TP-CDS 
store_sales table,and I see there is a degradation with async io feature. 
   test("parquet reader select") {
   val sc = SparkSession.builder().master("local[4]").getOrCreate()
   val df = sc.read.parquet("file:///D:\\work\\test\\tpcds\\store_sales")
   df.createOrReplaceTempView("table")
   val start = System.currentTimeMillis()
   sc.sql("select * from table order by ss_customer_sk limit 10").show()
   val end = System.currentTimeMillis()
   System.out.println("time: " + (end - start))
 }
   without this feature -> time: 7240
   with this feature -> time: 19923
   
   Threads are as expected 
   
![image](https://user-images.githubusercontent.com/87682445/221344885-a49cc5f2-9eba-4f0d-bc06-7615416d5b02.png)
   
   What process did I go wrong and can you show me the correct way to use this 
feature?
   
  
   
   
   
   

   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-02-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693284#comment-17693284
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1444106912

   > Hi, I am very interested in this optimization and just have some questions 
when testing in a cluster with 4nodes/96 cores using spark3.1. Unfortunately, I 
see little improvement.
   
   You're likely to see improvement in cases where file i/o is the bottleneck. 
Most TPC-DS queries are join heavy and you will see little improvement there. 
You might do better with TPC-H. 
   
   > I am confused than whether it is necessary to keep 
spark.sql.parquet.enableVectorizedReader = false in spark when testing with 
spark 3.1 and how can I set the parquet buffer size. 
   
   It's probably best to keep the parquet (read) buffer size untouched.
   
   You should keep `spark.sql.parquet.enableVectorizedReader = true` 
irrespective of this. This feature improves I/O speed of reading raw data. The 
Spark vectorized reader kicks in after data is read from storage and converts 
the raw data into Spark's internal columnar representation and is faster than 
the row based version.  
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-02-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693023#comment-17693023
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

whcdjj commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1442878000

   Hi, I am very interested in this optimization and just have some questiones 
when testing in a cluster with 4nodes/96 cores using spark3.1.  Unfortunately, 
I see little improvement.
   I am confused than whether it is neccessary to keep 
spark.sql.parquet.enableVectorizedReader = false in spark when testing with 
spark 3.2 and how can i set the parquet buffer size. Sincerely ask for advice 
@parthchandra 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2023-01-31 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682807#comment-17682807
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

wgtmac commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1411338630

   > @kazuyukitanimura @steveloughran @kbendick @ggershinsky @wgtmac 
@theosib-amazon Do you still have comments?
   
   It looks good to me. Please feel free to merge as you see fit. @shangxinli 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-12-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643502#comment-17643502
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1039934636


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}
+if (readIndex < buffers.size()) {
+  long start = System.nanoTime();
+  try {
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): Getting next buffer");
+}
+Future future = readFutures.take();
+future.get();
+long timeSpent = System.nanoTime() - start;
+totalCountBlocked.add(1);
+totalTimeBlocked.add(timeSpent);
+maxTimeBlocked.accumulate(timeSpent);
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, 
timeSpent);
+}
+  } catch (Exception e) {
+if (e instanceof InterruptedException) {
+  

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642878#comment-17642878
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1336203668

   @kazuyukitanimura @steveloughran @kbendick @ggershinsky @wgtmac 
@theosib-amazon Do you still have comments? 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642875#comment-17642875
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038808056


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.

Review Comment:
   I see the comment with 'hack'. What is the proper implementation? 





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642873#comment-17642873
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038807754


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}
+if (readIndex < buffers.size()) {
+  long start = System.nanoTime();
+  try {
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): Getting next buffer");
+}
+Future future = readFutures.take();
+future.get();
+long timeSpent = System.nanoTime() - start;
+totalCountBlocked.add(1);
+totalTimeBlocked.add(timeSpent);
+maxTimeBlocked.accumulate(timeSpent);
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, 
timeSpent);
+}
+  } catch (Exception e) {
+if (e instanceof InterruptedException) {
+  

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642872#comment-17642872
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038807754


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}
+if (readIndex < buffers.size()) {
+  long start = System.nanoTime();
+  try {
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): Getting next buffer");
+}
+Future future = readFutures.take();
+future.get();
+long timeSpent = System.nanoTime() - start;
+totalCountBlocked.add(1);
+totalTimeBlocked.add(timeSpent);
+maxTimeBlocked.accumulate(timeSpent);
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, 
timeSpent);
+}
+  } catch (Exception e) {
+if (e instanceof InterruptedException) {
+  

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638794#comment-17638794
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1327957394

   @wgtmac  Thank you!




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638053#comment-17638053
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

wgtmac commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1325873832

   > Updated. The thread pools are now set thru ParquetReadOptions
   
   Thanks for addressing this!
   
   I agree that adopting Hadoop vectored IO is an orthogonal topic and can be a 
separate patch when the API is ready. So I believe this patch is complete and 
have voted +1. Any improvement can be added via follow-up patches. I may 
request other maintainers for a final approval. @shangxinli 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637996#comment-17637996
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1325768325

   Updated. The thread pools are now set thru ParquetReadOptions




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637935#comment-17637935
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1030813744


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque> pagesInChunk = new 
LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue> readFutures = new 
ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int 
currentBlock,
+Decryptor headerBlockDecryptor,
+Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+BytesInputDecompressor decompressor
+  ) {
+this.parquetFileReader = parquetFileReader;
+this.chunk = chunk;
+this.currentBlock = currentBlock;
+this.headerBlockDecryptor = headerBlockDecryptor;
+this.pageBlockDecryptor = pageBlockDecryptor;
+this.aadPrefix = aadPrefix;
+this.rowGroupOrdinal = rowGroupOrdinal;
+this.columnOrdinal = columnOrdinal;
+this.decompressor = decompressor;
+
+this.type = 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637846#comment-17637846
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

wgtmac commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1030605835


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque> pagesInChunk = new 
LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue> readFutures = new 
ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int 
currentBlock,
+Decryptor headerBlockDecryptor,
+Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+BytesInputDecompressor decompressor
+  ) {
+this.parquetFileReader = parquetFileReader;
+this.chunk = chunk;
+this.currentBlock = currentBlock;
+this.headerBlockDecryptor = headerBlockDecryptor;
+this.pageBlockDecryptor = pageBlockDecryptor;
+this.aadPrefix = aadPrefix;
+this.rowGroupOrdinal = rowGroupOrdinal;
+this.columnOrdinal = columnOrdinal;
+this.decompressor = decompressor;
+
+this.type = 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636956#comment-17636956
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1322881480

   Let me verify that merging the thread pools is not dangerous and post an 
update. After that if be great if you can take it.




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636711#comment-17636711
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

wgtmac commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1322172984

   > > > IMO, switching `ioThreadPool` and `processThreadPool` the reader 
instance level will make it more flexible.
   > > 
   > > 
   > > I've changed the thread pool so that it is not initialized by default 
but I left them as static members. Ideally, there should be a single IO thread 
pool that handles all the IO for a process and the size of the pool is 
determined by the bandwidthof the underlying storage system. Making them per 
instance is not an issue though. The calling code can decide to set the same 
thread pool for all instances and achieve the same result. Let me update this.
   > > Also, any changes you want to make are fine with me, and the help is 
certainly appreciated !
   > 
   > I'm thinking of merging the thread pools into a single `ioThreadPool` and 
making it settable thru `ParquetReadOptions` (like the allocator is). The work 
being done by the `processThreadPool` is rather small and maybe we can do away 
with it. Adding the pool via `ParquetReadOptions` makes it easier to use with 
`ParquetReader` (used a lot in unit tests). WDYT?
   
   Sorry for my late reply.
   
   Setting the thread pools via `ParquetReadOptions` is a good idea and that is 
exactly the way I want to do them away with static members. Merging 
`ioThreadPool` and `processThreadPool` into a single pool should work if the 
tasks in the `processThreadPool` do not wait for the return of tasks in the 
`ioThreadPool`. I will look into the detail later.
   
   BTW, I don't have the permission to directly update your PR in place as I am 
not yet a maintainer of the repo. I may need to open a new one by copying what 
you have done here and add you as a co-author. WDYT? If that sounds good to 
you, I can proceed. @parthchandra 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635582#comment-17635582
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1319411064

   > > IMO, switching `ioThreadPool` and `processThreadPool` the reader 
instance level will make it more flexible.
   > 
   > I've changed the thread pool so that it is not initialized by default but 
I left them as static members. Ideally, there should be a single IO thread pool 
that handles all the IO for a process and the size of the pool is determined by 
the bandwidthof the underlying storage system. Making them per instance is not 
an issue though. The calling code can decide to set the same thread pool for 
all instances and achieve the same result. Let me update this.
   > 
   > Also, any changes you want to make are fine with me, and the help is 
certainly appreciated !
   
   I'm thinking of merging the thread pools into a single ioThreadPool and 
making the ioThreadPool settable thru `ParquetReadOptions` (like the allocator 
is). The work being done by the processThreadPool is rather small and maybe we 
can do away with it. 
   Adding the pool via `ParquetReadOptions`  makes it easier to use with 
`ParquetReader` (used a lot in unit tests).
   WDYT?




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635497#comment-17635497
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1319040697

   @wgtmac thank you for looking at this. I don't have any more TODOs on this 
PR. 
   
   > * Adopt the incoming Hadoop vectored io api.
   
   This should be part of another PR. There is a draft PR (#999 ) open for 
this. Once that is merged in, I can revisit the async I/O code and incorporate 
the vectored io api. 
   In other experiments I have seen that async io gives better results over 
slower networks. With faster network connections, as  is the case where we are 
reading from S3 within an AWS environment, reading in parallel (as the vector 
io api does), starts to give better results. 
   I believe, that both should be available as options. 
   
   > * Benchmark against remote object stores from different cloud providers.
   
   The numbers I posted earlier were for reading from AWS/S3 over a 1 Gbps 
line. Reading from within AWS shows lesser improvement. I don't have an account 
with other cloud providers. Any help here would be appreciated. 
   
   > IMO, switching `ioThreadPool` and `processThreadPool` the reader instance 
level will make it more flexible.
   
   I've changed the thread pool so that it is not initialized by default but I 
left them as static members. Ideally, there should be a single IO thread pool 
that handles all the IO for a process and the size of the pool is determined by 
the bandwidthof the underlying storage system. 
   Making them per instance is not an issue though. The calling code can decide 
to set the same thread pool for all instances and achieve the same result. 
   Let me update this. 
   
   Also, any changes you want to make are fine with me, and the help is 
certainly appreciated !
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-11-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635445#comment-17635445
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

wgtmac commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1318904541

   It looks like this PR is complete and all comments are addressed except some 
outstanding ones:
   
   - Adopt the incoming Hadoop vectored io api.
   - Benchmark against remote object stores from different cloud providers.
   
   IMO, switching `ioThreadPool` and `processThreadPool` the reader instance 
level will make it more flexible. 
   
   @parthchandra Do you have any TODO list for this PR? If you are busy with 
some other stuff, I can continue to work on it because it is really a nice 
improvement.
   
   cc @shangxinli 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-08-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580267#comment-17580267
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

ggershinsky commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r946662874


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+r -> new Thread(r, "parquet-io"));
+
+  // Thread pool to process pages for multiple columns in parallel. 
Applications should call
+  // setAsyncProcessThreadPool to initialize this with their own 
implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService processThreadPool = 
Executors.newCachedThreadPool(

Review Comment:
   not sure; looks like many tests use copy/paste, rather than extension..





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-08-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17576056#comment-17576056
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r927128065


##
parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java:
##
@@ -61,9 +65,10 @@ private HadoopReadOptions(boolean useSignedStringMinMax,
 Configuration conf,
 FileDecryptionProperties fileDecryptionProperties) 
{
 super(
-useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter, useColumnIndexFilter,
-usePageChecksumVerification, useBloomFilter, recordFilter, 
metadataFilter, codecFactory, allocator,
-maxAllocationSize, properties, fileDecryptionProperties
+  useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter,

Review Comment:
   That's how Intellij formatted it after I added the new parameters. Added 
them back.



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -796,6 +835,30 @@ public ParquetFileReader(InputFile file, 
ParquetReadOptions options) throws IOEx
 this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
+  private boolean isAsyncIOReaderEnabled(){
+if (options.isAsyncIOReaderEnabled() ) {
+  if (ioThreadPool != null) {
+return true;
+  } else {
+LOG.warn("Parquet async IO is configured but the IO thread pool has 
not been " +
+  "initialized. Configuration is being ignored");
+  }
+}
+return false;
+  }
+
+  private boolean isParallelColumnReaderEnabled(){
+if (options.isParallelColumnReaderEnabled() ) {
+  if (processThreadPool != null) {
+return true;
+  } else {
+LOG.warn("Parallel column reading is configured but the process thread 
pool has " +

Review Comment:
   Ditto



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -1455,6 +1578,8 @@ protected PageHeader readPageHeader() throws IOException {
 }
 
 protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, 
byte[] pageHeaderAAD) throws IOException {
+  String mode = (isAsyncIOReaderEnabled())? "ASYNC":"SYNC";

Review Comment:
   Done



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
 public long endPos() {
   return offset + length;
 }
+
+@Override
+public String toString() {
+  return "ConsecutivePartList{" +
+"offset=" + offset +
+", length=" + length +
+", chunks=" + chunks +
+'}';
+}
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+private final Chunk chunk;
+private final int currentBlock;
+private final BlockCipher.Decryptor headerBlockDecryptor;
+private final BlockCipher.Decryptor pageBlockDecryptor;
+private final byte[] aadPrefix;
+private final int rowGroupOrdinal;
+private final int columnOrdinal;
+
+//state
+private final LinkedBlockingDeque> pagesInChunk = new 
LinkedBlockingDeque<>();
+private DictionaryPage dictionaryPage = null;
+private int pageIndex = 0;
+private long valuesCountReadSoFar = 0;
+private int dataPageCountReadSoFar = 0;
+
+// derived
+private final PrimitiveType type;
+private final byte[] dataPageAAD;
+private final byte[] dictionaryPageAAD;
+private byte[] dataPageHeaderAAD = null;
+
+private final BytesInputDecompressor decompressor;
+
+private final ConcurrentLinkedQueue> readFutures = new 
ConcurrentLinkedQueue<>();
+
+private final LongAdder totalTimeReadOnePage = new LongAdder();
+private final LongAdder totalCountReadOnePage = new LongAdder();
+private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+  Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+  BytesInputDecompressor decompressor
+  ) {
+  this.chunk = chunk;
+  this.currentBlock = currentBlock;
+  this.headerBlockDecryptor = headerBlockDecryptor;
+  this.pageBlockDecryptor = pageBlockDecryptor;
+  this.aadPrefix = aadPrefix;
+  this.rowGroupOrdinal = 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-22 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557430#comment-17557430
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

ggershinsky commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903697361


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
 public long endPos() {
   return offset + length;
 }
+
+@Override
+public String toString() {
+  return "ConsecutivePartList{" +
+"offset=" + offset +
+", length=" + length +
+", chunks=" + chunks +
+'}';
+}
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+private final Chunk chunk;
+private final int currentBlock;
+private final BlockCipher.Decryptor headerBlockDecryptor;
+private final BlockCipher.Decryptor pageBlockDecryptor;
+private final byte[] aadPrefix;
+private final int rowGroupOrdinal;
+private final int columnOrdinal;
+
+//state
+private final LinkedBlockingDeque> pagesInChunk = new 
LinkedBlockingDeque<>();
+private DictionaryPage dictionaryPage = null;
+private int pageIndex = 0;
+private long valuesCountReadSoFar = 0;
+private int dataPageCountReadSoFar = 0;
+
+// derived
+private final PrimitiveType type;
+private final byte[] dataPageAAD;
+private final byte[] dictionaryPageAAD;

Review Comment:
   probably not needed





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-22 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557425#comment-17557425
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

ggershinsky commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903693351


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
 public long endPos() {
   return offset + length;
 }
+
+@Override
+public String toString() {
+  return "ConsecutivePartList{" +
+"offset=" + offset +
+", length=" + length +
+", chunks=" + chunks +
+'}';
+}
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+private final Chunk chunk;
+private final int currentBlock;
+private final BlockCipher.Decryptor headerBlockDecryptor;
+private final BlockCipher.Decryptor pageBlockDecryptor;
+private final byte[] aadPrefix;
+private final int rowGroupOrdinal;
+private final int columnOrdinal;
+
+//state
+private final LinkedBlockingDeque> pagesInChunk = new 
LinkedBlockingDeque<>();
+private DictionaryPage dictionaryPage = null;
+private int pageIndex = 0;
+private long valuesCountReadSoFar = 0;
+private int dataPageCountReadSoFar = 0;
+
+// derived
+private final PrimitiveType type;
+private final byte[] dataPageAAD;
+private final byte[] dictionaryPageAAD;
+private byte[] dataPageHeaderAAD = null;
+
+private final BytesInputDecompressor decompressor;
+
+private final ConcurrentLinkedQueue> readFutures = new 
ConcurrentLinkedQueue<>();
+
+private final LongAdder totalTimeReadOnePage = new LongAdder();
+private final LongAdder totalCountReadOnePage = new LongAdder();
+private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+  Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+  BytesInputDecompressor decompressor
+  ) {
+  this.chunk = chunk;
+  this.currentBlock = currentBlock;
+  this.headerBlockDecryptor = headerBlockDecryptor;
+  this.pageBlockDecryptor = pageBlockDecryptor;
+  this.aadPrefix = aadPrefix;
+  this.rowGroupOrdinal = rowGroupOrdinal;
+  this.columnOrdinal = columnOrdinal;
+  this.decompressor = decompressor;
+
+  this.type = getFileMetaData().getSchema()
+.getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+  if (null != headerBlockDecryptor) {
+dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+  rowGroupOrdinal,
+  columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+  }
+  if (null != pageBlockDecryptor) {
+dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+  columnOrdinal, 0);
+dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,

Review Comment:
   Yep, the `dictionaryPageAAD` is not necessary here. This is a significant 
code change, more than just moving the current logic of
   ```java
   public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor 
headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor, byte[] 
aadPrefix, int rowGroupOrdinal, int columnOrdinal)
   ```
   
   I'll have a closer look at the details, but we need a unitest (proposed in 
my other comment) to make sure decryption works ok with async io and parallel 
column reading.





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-22 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557375#comment-17557375
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

ggershinsky commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903595526


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
 public long endPos() {
   return offset + length;
 }
+
+@Override
+public String toString() {
+  return "ConsecutivePartList{" +
+"offset=" + offset +
+", length=" + length +
+", chunks=" + chunks +
+'}';
+}
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {

Review Comment:
   maybe can also be separated from the ParquetFileReader, this is a chance to 
reduce the size of the latter :)





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-22 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557373#comment-17557373
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

ggershinsky commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903592957


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+r -> new Thread(r, "parquet-io"));
+
+  // Thread pool to process pages for multiple columns in parallel. 
Applications should call
+  // setAsyncProcessThreadPool to initialize this with their own 
implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService processThreadPool = 
Executors.newCachedThreadPool(

Review Comment:
   given the comment "Default initialization is useful only for testing", maybe 
this can be moved to the tests?





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-22 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557369#comment-17557369
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

ggershinsky commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903469988


##
parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java:
##
@@ -61,10 +61,7 @@ public InternalFileDecryptor(FileDecryptionProperties 
fileDecryptionProperties)
 
   private BlockCipher.Decryptor getThriftModuleDecryptor(byte[] columnKey) {
 if (null == columnKey) { // Decryptor with footer key
-  if (null == aesGcmDecryptorWithFooterKey) {
-aesGcmDecryptorWithFooterKey = 
ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey);
-  }
-  return aesGcmDecryptorWithFooterKey;
+  return ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey);

Review Comment:
   could you add a unitest of decryption with async io and parallel column 
reader, eg to the 
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java#L507



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService ioThreadPool = Executors.newCachedThreadPool(
+r -> new Thread(r, "parquet-io"));
+
+  // Thread pool to process pages for multiple columns in parallel. 
Applications should call
+  // setAsyncProcessThreadPool to initialize this with their own 
implementations.
+  // Default initialization is useful only for testing
+  public static ExecutorService processThreadPool = 
Executors.newCachedThreadPool(

Review Comment:
   should we be creating thread pools if the Async IO and parallel column 
reading are not activated? 
   (here and in the line 135)



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
 public long endPos() {
   return offset + length;
 }
+
+@Override
+public String toString() {
+  return "ConsecutivePartList{" +
+"offset=" + offset +
+", length=" + length +
+", chunks=" + chunks +
+'}';
+}
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {

Review Comment:
   we already have a PageReader (interface). Could you rename this class.



##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,158 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-22 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557307#comment-17557307
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

ggershinsky commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r903403821


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
 public long endPos() {
   return offset + length;
 }
+
+@Override
+public String toString() {
+  return "ConsecutivePartList{" +
+"offset=" + offset +
+", length=" + length +
+", chunks=" + chunks +
+'}';
+}
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+private final Chunk chunk;
+private final int currentBlock;
+private final BlockCipher.Decryptor headerBlockDecryptor;
+private final BlockCipher.Decryptor pageBlockDecryptor;
+private final byte[] aadPrefix;
+private final int rowGroupOrdinal;
+private final int columnOrdinal;
+
+//state
+private final LinkedBlockingDeque> pagesInChunk = new 
LinkedBlockingDeque<>();
+private DictionaryPage dictionaryPage = null;
+private int pageIndex = 0;
+private long valuesCountReadSoFar = 0;
+private int dataPageCountReadSoFar = 0;
+
+// derived
+private final PrimitiveType type;
+private final byte[] dataPageAAD;
+private final byte[] dictionaryPageAAD;
+private byte[] dataPageHeaderAAD = null;
+
+private final BytesInputDecompressor decompressor;
+
+private final ConcurrentLinkedQueue> readFutures = new 
ConcurrentLinkedQueue<>();
+
+private final LongAdder totalTimeReadOnePage = new LongAdder();
+private final LongAdder totalCountReadOnePage = new LongAdder();
+private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+  Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+  BytesInputDecompressor decompressor
+  ) {
+  this.chunk = chunk;
+  this.currentBlock = currentBlock;
+  this.headerBlockDecryptor = headerBlockDecryptor;
+  this.pageBlockDecryptor = pageBlockDecryptor;
+  this.aadPrefix = aadPrefix;
+  this.rowGroupOrdinal = rowGroupOrdinal;
+  this.columnOrdinal = columnOrdinal;
+  this.decompressor = decompressor;
+
+  this.type = getFileMetaData().getSchema()
+.getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+  if (null != headerBlockDecryptor) {
+dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPageHeader,
+  rowGroupOrdinal,
+  columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+  }
+  if (null != pageBlockDecryptor) {
+dataPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DataPage, rowGroupOrdinal,
+  columnOrdinal, 0);
+dictionaryPageAAD = AesCipher.createModuleAAD(aadPrefix, 
ModuleType.DictionaryPage,

Review Comment:
   sure





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-22 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557305#comment-17557305
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

ggershinsky commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r90337


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,158 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}
+if (readIndex < buffers.size()) {
+  long start = System.nanoTime();
+  try {
+LOG.debug("ASYNC (next): Getting next buffer");
+Future future = readFutures.take();
+future.get();
+long timeSpent = System.nanoTime() - start;
+totalCountBlocked.add(1);
+totalTimeBlocked.add(timeSpent);
+maxTimeBlocked.accumulate(timeSpent);
+LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, 
timeSpent);

Review Comment:
   should `if (LOG.isDebugEnabled()) {` be added here and in 118? This check is 
performed in the constructor (line 58); `nextBuffer()` is called with 
higher(/same) frequency.





> Implement async 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556480#comment-17556480
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1160693399

   @shangxinli Thank you for the review! I'll address these comments asap.
   I am reviewing the thread pool and its initialization. IMO, it is better if 
there is no default initialization of the pool and the calling 
application/framework does so explicitly. One side effect of the default 
initialization is that the pool is created unnecessarily even if async is off. 
Also, if an application, shades and includes another copy of the library (or 
transitively, many more), then one more thread pool gets created for every 
version of the library included. 
   It is probably a better idea to allow the thread pool to be assigned as a 
per instance variable. The calling application can then decide to use a single 
pool for all instances or a new one per instance whichever use case is better 
for their performance.
   Finally, some large scale testing has revealed a possible resource leak. I'm 
looking into addressing it. 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556467#comment-17556467
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r901859862


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();

Review Comment:
   dynamically changing the number of threads/buffer sizes/cache sizes is a 
recurrent source of pain in past work, as once you get to 128 core systems they 
often end up asking for too much of a limited resource





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556109#comment-17556109
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r900994027


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -46,12 +46,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;

Review Comment:
   I guess it is IDE does that but let's not use wildcard here 



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -126,6 +127,42 @@ public class ParquetFileReader implements Closeable {
 
   public static String PARQUET_READ_PARALLELISM = 
"parquet.metadata.read.parallelism";
 
+  public static int numProcessors = Runtime.getRuntime().availableProcessors();
+
+  // Thread pool to read column chunk data from disk. Applications should call 
setAsyncIOThreadPool
+  // to initialize this with their own implementations.
+  // Default initialization is useful only for testing

Review Comment:
   I understand we want applications to provide their own implementations, but 
can you share why we choose the cached thread pool instead of fixed in default? 
I kind of feel a lot of user scenarios of Parquet is with unpredictable 
execution times and we need better control over our program's resource 
consumption. 



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -1387,8 +1489,13 @@ public void close() throws IOException {
* result of the column-index based filtering when some pages might be 
skipped at reading.
*/
   private class ChunkListBuilder {
+// ChunkData is backed by either a list of buffers or a list of strams

Review Comment:
   typo? streams? 



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##
@@ -1796,5 +1882,314 @@ public void readAll(SeekableInputStream f, 
ChunkListBuilder builder) throws IOEx
 public long endPos() {
   return offset + length;
 }
+
+@Override
+public String toString() {
+  return "ConsecutivePartList{" +
+"offset=" + offset +
+", length=" + length +
+", chunks=" + chunks +
+'}';
+}
   }
+
+  /**
+   * Encapsulates the reading of a single page.
+   */
+  public class PageReader implements Closeable {
+private final Chunk chunk;
+private final int currentBlock;
+private final BlockCipher.Decryptor headerBlockDecryptor;
+private final BlockCipher.Decryptor pageBlockDecryptor;
+private final byte[] aadPrefix;
+private final int rowGroupOrdinal;
+private final int columnOrdinal;
+
+//state
+private final LinkedBlockingDeque> pagesInChunk = new 
LinkedBlockingDeque<>();
+private DictionaryPage dictionaryPage = null;
+private int pageIndex = 0;
+private long valuesCountReadSoFar = 0;
+private int dataPageCountReadSoFar = 0;
+
+// derived
+private final PrimitiveType type;
+private final byte[] dataPageAAD;
+private final byte[] dictionaryPageAAD;
+private byte[] dataPageHeaderAAD = null;
+
+private final BytesInputDecompressor decompressor;
+
+private final ConcurrentLinkedQueue> readFutures = new 
ConcurrentLinkedQueue<>();
+
+private final LongAdder totalTimeReadOnePage = new LongAdder();
+private final LongAdder totalCountReadOnePage = new LongAdder();
+private final LongAccumulator maxTimeReadOnePage = new 
LongAccumulator(Long::max, 0L);
+private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+private final LongAccumulator maxTimeBlockedPagesInChunk = new 
LongAccumulator(Long::max, 0L);
+
+public PageReader(Chunk chunk, int currentBlock, Decryptor 
headerBlockDecryptor,
+  Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int 
columnOrdinal,
+  BytesInputDecompressor decompressor
+  ) {
+  this.chunk = chunk;
+  this.currentBlock = currentBlock;
+  this.headerBlockDecryptor = headerBlockDecryptor;
+  this.pageBlockDecryptor = pageBlockDecryptor;
+  this.aadPrefix = aadPrefix;
+  this.rowGroupOrdinal = rowGroupOrdinal;
+  this.columnOrdinal = columnOrdinal;
+  this.decompressor = decompressor;
+
+  this.type = getFileMetaData().getSchema()
+.getType(chunk.descriptor.col.getPath()).asPrimitiveType();
+
+  if (null != 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555935#comment-17555935
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r900993899


##
parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java:
##
@@ -61,9 +65,10 @@ private HadoopReadOptions(boolean useSignedStringMinMax,
 Configuration conf,
 FileDecryptionProperties fileDecryptionProperties) 
{
 super(
-useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter, useColumnIndexFilter,
-usePageChecksumVerification, useBloomFilter, recordFilter, 
metadataFilter, codecFactory, allocator,
-maxAllocationSize, properties, fileDecryptionProperties
+  useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter,

Review Comment:
   it seems two spaces were removed. 





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17553586#comment-17553586
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1153924743

   (i could of course add those probes into the shim class, so at least that 
access of internals was in one place)




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17553585#comment-17553585
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1153923501

   bq.  perhaps check if the ByteBufferReadable interface is implemented in the 
stream?
   
   The requirement for the `hasCapability("in:readbytebuffer")` to return true 
postdates the API; there's no way to be confident that if the probe returns 
false (or hasPathCapability() isn't available) that the method *isn't actually 
there*
   
   see #951 for a design which will trust a `true` response, falling back to 
looking at the wrapped stream. Note that as it calls getWrapped() it is calling 
methods tagged LimitedPrivate. it should really do that...at the very least 
hadoop needs a PR saying "we need to do this because..." and that tag can be 
changed




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17552370#comment-17552370
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1151467274

   Sounds good.
   
   Also, perhaps check if the ByteBufferReadable interface is implemented in 
the stream?
   > ByteBufferReadable will raise UnsupportedException if not found, there is 
a check for it
   > 
https://github.com/steveloughran/fs-api-shim/blob/main/fs-api-shim-library/src/main/java/org/apache/hadoop/fs/shim/FSDataInputStreamShim.java
   
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17552350#comment-17552350
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1151417126

   I've started work on a fs-api-shim library, with the goal of "apps compile 
against hadoop 3.2.0 can get access to the 3.3 and 3.4 APIs when available 
either with transparent fallback (openFile()) or ability to probe the API 
before trying to invoke
   
   https://github.com/steveloughran/fs-api-shim
   
   openfile takes the seek & status params, falls back to open()
   : 
https://github.com/steveloughran/fs-api-shim/blob/main/fs-api-shim-library/src/main/java/org/apache/hadoop/fs/shim/FileSystemShim.java#L87
   
   ByteBufferReadable will raise UnsupportedException if not found, there is a 
check for it
   
https://github.com/steveloughran/fs-api-shim/blob/main/fs-api-shim-library/src/main/java/org/apache/hadoop/fs/shim/FSDataInputStreamShim.java
   
   Vector IO SHALL be available the same way
   
   Adopt 3.2.0 then and we will help give the library the ability to use the 
newer api calls, even stuff not yet shipped in apache releases.
   
   (I want to release this as an asf artifact with oversight by hadoop project. 
lets us maintain it)
   
   
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-06-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17552331#comment-17552331
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r893760667


##
parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java:
##
@@ -0,0 +1,269 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ *   A bare minimum implementation of a {@link java.io.SequenceInputStream} 
that wraps an
+ *   ordered collection of ByteBufferInputStreams.
+ *   
+ *   This class, as implemented, is intended only for a specific use in the 
ParquetFileReader and
+ *   throws {@link UnsupportedOperationException} in unimplemented methods to 
catch any unintended
+ *   use in other cases.
+ *   
+ *   Even thought this class is derived from ByteBufferInputStream it 
explicitly does not support any
+ *   byte buffer related methods like slice. It does, however support 
sliceBuffers which is a
+ *   curious case of reading data from underlying streams
+ *   
+ *   Even though this class changes the state of the underlying streams (by 
reading from them)
+ *   it does not own them and so the close method does not close the streams. 
To avoid resource
+ *   leaks the calling code should close the underlying streams
+ */
+public class SequenceByteBufferInputStream extends ByteBufferInputStream {
+
+  Collection collection;
+  Iterator iterator;
+  ByteBufferInputStream current;
+  long position = 0;
+
+  @Override
+  public String toString() {
+return "SequenceByteBufferInputStream{" +
+  "collection=" + collection +
+  ", current=" + current +
+  ", position=" + position +
+  '}';
+  }
+
+  public SequenceByteBufferInputStream(Collection 
collection) {
+this.collection = collection;
+iterator = collection.iterator();
+current = iterator.hasNext() ? iterator.next() : null;
+if (current == null) {
+  throw new UnsupportedOperationException(
+"Initializing SequenceByteBufferInputStream with an empty collection 
is not supported");
+}
+  }
+
+  @Override
+  public long position() {
+return position;
+  }
+
+  @Override
+  public int read(ByteBuffer out) {

Review Comment:
   good to know. 





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17542147#comment-17542147
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1137610598

   That batch reader in Presto reminds me of some of the experimental changes I 
made in Trino. I modified PrimitiveColumnReader to work out how many of each 
data item it needs to read from the data source and requests all of them at 
once in an array. This doubled the performance of some TPCDS queries. This is 
why I have array access methods planned for ParquetMR. 
(https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing)
 Requesting data in bulk saves a lot on function call overhead for each data 
item.




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17542117#comment-17542117
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

sunchao commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1137568979

   > and yes, updating parquet dependencies would be good, hadoop 3.3.0 should 
be the baseline.
   
   +1 on upgrading to 3.3.0, although currently parquet is using 2.10.1 as a 
provided dependency and we need to make sure it continues to work with hadoop 
2.x
   
   > It may be because I was using Spark's vectorized parquet decoding which is 
an order or magnitude faster than parquet library's row by row decoding (see 
[Spark 
benchmarks](https://github.com/apache/spark/blob/master/sql/core/benchmarks/DataSourceReadBenchmark-results.txt)).
 If trino is not doing vectorized decoding (I took a very quick look and I 
don't think it is), I would suggest you can look into that next. All the cool 
kids are doing it.
   
   Presto already has a [batch 
reader](https://github.com/prestodb/presto/tree/master/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader)
 but seems the feature is not in Trino yet. The batch reader did help a lot to 
reduce the CPU load. See [the 
slides](https://github.com/prestodb/presto/wiki/files/presto-meetup-oct-2019/uber.pdf).




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541757#comment-17541757
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136552664

   > This is interesting, because when I did profiling of Trino, I found that 
although I/O (from S3, over the network no less) was significant, even more 
time was spent in compute. Maybe you're getting improved performance because 
you're increasing _parallelism_ between I/O and compute.
   
   It may be because I was using Spark's vectorized parquet decoding which is 
an order or magnitude faster than parquet library's row by row decoding (see 
[Spark 
benchmarks](https://github.com/apache/spark/blob/master/sql/core/benchmarks/DataSourceReadBenchmark-results.txt)).
 If trino is not doing vectorized decoding (I took a very quick look and I 
don't think it is), I would suggest you can look into that next. All the cool 
kids are doing it. 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541754#comment-17541754
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136542868

   > So it sounds like you're optimizing one layer of processing, and I'm 
optimizing the next layer up, and it's kindof a coincidence that we're touching 
some of the same classes just because code reuse has been possible here.
   Yeah, kind of cool :)
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541752#comment-17541752
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136531385

   BTW, adding more tests for the InputStream implementations. 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541750#comment-17541750
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136528207

   > > Is byte (and arrays and buffers of bytes) the only datatype you support? 
My PR is optimizing code paths that pull ints, longs, and other sizes out of 
the data buffers. Are those not necessary for any of the situations where 
you're using an async buffer?
   > > The input stream API is generally unaware of the datatypes of its 
contents and so those are the only apis I use. The other reason is that the 
ParquetFileReader returns Pages which basically contain metadata and 
ByteBuffers of _compressed_ data. The decompression and decoding into types 
comes much later in a downstream thread.
   > > For your PR, I don't think the AsyncMultibufferInputStream is every 
going to be in play in the paths you're optimizing. But just in case it is, 
your type aware methods will work as is because AsyncMultibufferInputStream is 
derived from MultiBufferInputStream and will inherit those methods.
   
   I'm still learning Parquet's structure. So it sounds to me like these buffer 
input streams are used twice. Once to get data and decompress it, and then once 
again to decode it into data structures. Is that correct? So it sounds like 
you're optimizing one layer of processing, and I'm optimizing the next layer 
up, and it's kindof a coincidence that we're touching some of the same classes 
just because code reuse has been possible here.




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541749#comment-17541749
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136528026

   > Latency is the killer; in an HTTP request you want read enough but not 
discard data or break an http connection if the client suddenly does a seek() 
or readFully() somewhere else. file listings, existence checks etc.
   > 
   > That'd be great. now, if you could also handle requesting different 
columns in parallel and processing them out of order.
   
   I do. The Parquet file reader api that reads row groups in sync mode reads 
all columns in sequence. In async mode, it fires off a task for every column 
blocking only to read the first page of every column before returning. This 
part also uses a different thread pool from the IO tasks so that IO tasks never 
wait because there are no available threads in the thread pool.
   
   > 
   > be good to think about vectored IO.
   
   I think I know how to integrate this PR with the vectored IO, but this is 
only after a cursory look. 
   
   > 
   > and yes, updating parquet dependencies would be good, hadoop 3.3.0 should 
be the baseline.
   
   Who can drive this  (presumably) non-trivial change? I myself have no karma 
points :(

   > just sketched out my thoughts on this. I've played with some of this in my 
own branch. I think the next step would be for me to look at the benchmark code 
to make it targetable elsewhere.
   > 
   > 
https://docs.google.com/document/d/1y9oOSYbI6fFt547zcQJ0BD8VgvJWdyHBveaiCHzk79k/
   
   This is great. I now have much more context of where you are coming from 
(and going to) !




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541748#comment-17541748
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136526711

   This is interesting, because when I did profiling of Trino, I found that 
although I/O (from S3, over the network no less) was significant, even more 
time was spent in compute. Maybe you're getting improved performance because 
you're increasing *parallelism* between I/O and compute.




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541725#comment-17541725
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136465506

   > At this point the bottlenecks in parquet begin to move towards 
decompression and decoding but IO remains the slowest link in the chain.
   
   Latency is the killer; in an HTTP request you want read enough but not 
discard data or break an http connection if the client suddenly does a seek() 
or readFully() somewhere else. file listings, existence checks etc.
   
   > One thing we get with my PR is that the ParquetFileReader had assumptions 
built in that all data must be read before downstream can proceed. Some of my 
changes are related to removing these assumptions and ensuring that downstream 
processing does not block until an entire column is read so we get efficient 
pipelining.
   
   That'd be great. now, if you could also handle requesting different columns 
in parallel and processing them out of order.
   
   > What does the 128 MB block mean? Is this the amount prefetched for a 
stream? The read API does not block until the entire block is filled, I presume.
   
   this was the abfs client set to do four GET requests of 128MB each. this 
would be awful for columns stores where smaller ranges are often 
requested/processed before another seek is made, but quite often parquet does 
do more back to back reads than just one read/readFully request
   
   > With my PR, parquet IO is reading 8MB at a time (default) and downstream 
is processing 1MB at a time (default) and several such streams (one per column) 
are in progress at the same time. Hopefully, this read pattern would work with 
the prefetch.
   
   be good to think about vectored IO.
   
   and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be 
the baseline.
   
   just sketched out my thoughts on this. I've played with some of this in my 
own branch. I think the next step would be for me to look at the benchmark code 
to make it targetable elsewhere.
   
   
   
https://docs.google.com/document/d/1y9oOSYbI6fFt547zcQJ0BD8VgvJWdyHBveaiCHzk79k/
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541717#comment-17541717
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136439133

   > Is byte (and arrays and buffers of bytes) the only datatype you support? 
My PR is optimizing code paths that pull ints, longs, and other sizes out of 
the data buffers. Are those not necessary for any of the situations where 
you're using an async buffer?
   The input stream API is generally unaware of the datatypes of its contents 
and so those are the only apis I use. The other reason is that the 
ParquetFileReader returns Pages which basically contain metadata and 
ByteBuffers of _compressed_ data. The decompression and decoding into types 
comes much later in a downstream thread.
   For your PR, I don't think the AsyncMultibufferInputStream is every going to 
be in play in the paths you're optimizing. But just in case it is, your type 
aware methods will work as is because AsyncMultibufferInputStream is derived 
from MultiBufferInputStream and will inherit those methods.  
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541699#comment-17541699
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136427603

   > thanks., that means you are current with all shipping improvments. the 
main one extra is to use openFile(), passing in length and requesting randomio. 
this guarantees ranged GET requests and cuts the initial HEAD probe for 
existence/size of file.
   
   By `openFile()` do you mean 
`FileSystem.openFileWithOptions(Path,OpenFileParameters)`?
   While looking I realized the Parquet builds with a [much older version of 
hadoop](https://github.com/apache/parquet-mr/blob/a2da156b251d13bce1fa81eb95b555da04880bc1/pom.xml#L79)
  
   > > > have you benchmarked this change with abfs or google gcs connectors to 
see what difference it makes there?
   > 
   > > No I have not. Would love help from anyone in the community with access 
to these. I only have access to S3.
   > 
   > that I have. FWIW, with the right tuning of abfs prefetch (4 threads, 128 
MB blocks) i can get full FTTH link rate from a remote store; 700 mbit/s . 
that's to the base station. once you add wifi the bottlenecks move.
   
   Wow! That is nearly as fast as local HDD. At this point the bottlenecks in 
parquet begin to move towards decompression and decoding but IO remains the 
slowest link in the chain.  One thing we get with my PR is that the 
ParquetFileReader had assumptions built in that all data must be read before 
downstream can proceed. Some of my changes are related to removing these 
assumptions and ensuring that downstream processing does not block until an 
entire column is read so we get efficient pipelining. 
   What does the 128 MB block mean? Is this the amount prefetched for a stream? 
The read API does not block until the entire block is filled, I presume. 
   With my PR, parquet IO is reading 8MB at a time (default) and downstream is 
processing 1MB at a time (default) and several such streams (one per column) 
are in progress at the same time. Hopefully, this read pattern would work with 
the prefetch.




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541618#comment-17541618
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r880747716


##
parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java:
##
@@ -1,14 +1,14 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 

Review Comment:
   ack





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541600#comment-17541600
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1136123313

   Is byte (and arrays and buffers of bytes) the only datatype you support? My 
PR is optimizing code paths that pull ints, longs, and other sizes out of the 
data buffers. Are those not necessary for any of the situations where you're 
using an async buffer?




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541590#comment-17541590
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

theosib-amazon commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r880691495


##
parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java:
##
@@ -1,14 +1,14 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 

Review Comment:
   It looks like you're running in to the same bug in IntelliJ as I am, where 
it makes whitespace chances without authorization. Would you mind commenting on 
this bug report that I filed?
   
https://youtrack.jetbrains.com/issue/IDEA-293197/IntelliJ-makes-unauthorized-changes-to-whitespace-in-comments-wo





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541359#comment-17541359
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1135585289

   > I was working with s3a
   > Spark 3.2.1
   > Hadoop (Hadoop-aws) 3.3.2
   > AWS SDK 1.11.655
   
   thanks., that means you are current with all shipping improvments. the main 
one extra is to use openFile(), passing in length and requesting randomio. this 
guarantees ranged GET requests and cuts the initial HEAD probe for 
existence/size of file.
   
   >> have you benchmarked this change with abfs or google gcs connectors to 
see what difference it makes there?
   
   > No I have not. Would love help from anyone in the community with access to 
these. I only have access to S3.
   
   that I have. FWIW, with the right tuning of abfs prefetch (4 threads, 128 MB 
blocks) i can get full FTTH link rate from a remote store; 700 mbit/s . that's 
to the base station. once you add wifi the bottlenecks move. 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541250#comment-17541250
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1135366352

   @steveloughran thank you very much for taking the time to review and provide 
feedback! 
   
   > 1. whose s3 client was used for testing here -if the s3a one, which hadoop 
release?
   
   I was working with s3a -
 Spark 3.2.1
 Hadoop (Hadoop-aws) 3.3.2
 AWS SDK 1.11.655
 
   
   > 2. the azure abfs and gcs connectors do async prefetching of the next 
block, but are simply assuming that code will read sequentially; if there is 
another seek/readFully to a new location, those prefetches will be abandoned. 
there is work in s3a to do prefetching here with caching, so as to reduce the 
penalty of backwards seeks. https://issues.apache.org/jira/browse/HADOOP-18028
   
   I haven't worked with abfs or gcs. If the connectors do async pre-fetching, 
that would be great. Essentially,  the time the Parquet reader would have to 
block in the file system API would reduce substantially. In such a case, we 
could turn the async reader on/off  and rerun the benchmark to compare. From 
past experience with the MaprFS which had very aggressive read ahead in its 
hdfs client, I would still expect better parquet speeds. The fact that the 
prefetch is turned off when a seek occurs is usual behaviour, but we may see no 
benefit from the connector in that case. So a combination of async reader and 
async connector might end up being a great solution (maybe at a slightly 
greater CPU utilization). We would still have to do a benchmark to see the real 
effect.
   The async version in this PR takes care of the sequential read requirement 
by a) opening a new stream for each column and ensuring every column is read 
sequentially. Footers are read using a separate stream. Except for the footer, 
no other stream ever seeks to a new location. b) The amount of data to be read 
is predetermined so there is never a read ahead that is discarded.
   
   > 
   > hadoop is adding a vectored IO api intended for libraries like orc and 
parquet to be able to use, where the application provides an unordered list of 
ranges, a bytebuffer supplier and gets back a list of futures to wait for. the 
base implementation simply reads using readFully APi. s3a (and later abfs) will 
do full async retrieval itself, using the http connection pool. 
https://issues.apache.org/jira/browse/HADOOP-18103
   > 
   > both vectored io and s3a prefetching will ship this summer in hadoop 
3.4.0. i don't see this change conflicting with this, though they may obsolete 
a lot of it.
   
   Yes, I became aware of this recently. I'm discussing integration of these 
efforts in a separate channel. At the moment I see no conflict, but have yet to 
determine how much of this async work would need to be changed. I suspect we 
may be able to eliminate or vastly simplify `AsyncMultiBufferInputStream`. 
   
   > have you benchmarked this change with abfs or google gcs connectors to see 
what difference it makes there?
   
   No I have not. Would love help from anyone in the community with access to 
these. I only have access to S3.
   
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541248#comment-17541248
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r879902468


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  long putStart = System.nanoTime();
+  long putCompleted = System.nanoTime();
+  LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",

Review Comment:
   It doesn't. Sorry, this got left behind after a cleanup. 



##
parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java:
##
@@ -0,0 +1,269 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541021#comment-17541021
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r879586630


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  long putStart = System.nanoTime();
+  long putCompleted = System.nanoTime();
+  LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",

Review Comment:
   how does this work?



##
parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java:
##
@@ -0,0 +1,269 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541016#comment-17541016
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

steveloughran commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1134843705

   1. whose s3 client was used for testing here -if the s3a one, which hadoop 
release?
   2. the azure abfs and gcs connectors do async prefetching of the next block, 
but are simply assuming that code will read sequentially; if there is another 
seek/readFully to a new location, those prefetches will be abandoned. there is 
work in s3a to do prefetching here with caching, so as to reduce the penalty of 
backwards seeks. https://issues.apache.org/jira/browse/HADOOP-18028
   
   hadoop is adding a vectored IO api intended for libraries like orc and 
parquet to be able to use, where the application provides an unordered list of 
ranges, a bytebuffer supplier and gets back a list of futures to wait for. the 
base implementation simply reads using readFully APi. s3a (and later abfs) will 
do full async retrieval itself, using the http connection pool.
https://issues.apache.org/jira/browse/HADOOP-18103
   
   both vectored io and s3a prefetching will ship this summer in hadoop 3.4.0. 
i don't see this change conflicting with this, though they may obsolete a lot 
of it.
   
   have you benchmarked this change with abfs or google gcs connectors to see 
what difference it makes there?




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17540203#comment-17540203
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

kbendick commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r878332111


##
parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java:
##
@@ -37,4 +39,9 @@ public interface PageReader {
* @return the next page in that chunk or null if after the last page
*/
   DataPage readPage();
+
+  /**
+   * Close the page reader. By default it is no-op.
+   */
+  default void close() throws IOException {}

Review Comment:
   Cool. I know a lot of times I check if instances of closeable and then call 
close.





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17540200#comment-17540200
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r877587287


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -21,12 +21,15 @@
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.HashMap;
+import java.util.Iterator;

Review Comment:
   Yeah. This and a bunch of other imports are not used. Removing them all.



##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  final SeekableInputStream fileInputStream;
+  int fetchIndex = 0;
+  int readIndex = 0;
+  ExecutorService threadPool;
+  LinkedBlockingQueue> readFutures;
+  boolean closed = false;
+
+  LongAdder totalTimeBlocked = new LongAdder();
+  LongAdder totalCountBlocked = new LongAdder();
+  LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);

Review Comment:
   Done



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -21,12 +21,15 @@
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.PrimitiveIterator;
 import java.util.Queue;
 
+import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.parquet.ParquetRuntimeException;

Review Comment:
   Removed





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539724#comment-17539724
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

kazuyukitanimura commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r877368066


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -21,12 +21,15 @@
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.PrimitiveIterator;
 import java.util.Queue;
 
+import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.parquet.ParquetRuntimeException;

Review Comment:
   Also I am not sure if `ParquetRuntimeException` is used...



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -21,12 +21,15 @@
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.HashMap;
+import java.util.Iterator;

Review Comment:
   I can't seem to find `Iterator` is used... Should we remove if it is the 
case?



##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  final SeekableInputStream fileInputStream;
+  int fetchIndex = 0;
+  int readIndex = 0;
+  ExecutorService threadPool;
+  LinkedBlockingQueue> readFutures;
+  boolean closed = false;
+
+  LongAdder totalTimeBlocked = new LongAdder();
+  LongAdder totalCountBlocked = new LongAdder();
+  LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);

Review Comment:
   Does it make sense to change these to `private` if they are not accessed 
from other places?



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -21,12 +21,15 @@
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.PrimitiveIterator;
 import java.util.Queue;

Review Comment:
   I think `Queue` is no longer used with this change. Should we remove the 
import?





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539121#comment-17539121
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130698923

   @theosib-amazon I applied my PR on top of your PR, ran thru some tests using 
Spark, and hit no issues. (All unit tests passed as well).




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539119#comment-17539119
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876197602


##
parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java:
##
@@ -37,4 +39,9 @@ public interface PageReader {
* @return the next page in that chunk or null if after the last page
*/
   DataPage readPage();
+
+  /**
+   * Close the page reader. By default it is no-op.
+   */
+  default void close() throws IOException {}

Review Comment:
   Sure.



##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  final SeekableInputStream fileInputStream;
+  int fetchIndex = 0;
+  int readIndex = 0;
+  ExecutorService threadPool;
+  LinkedBlockingQueue> readFutures;
+  boolean closed = false;
+  Exception ioException;
+
+  LongAdder totalTimeBlocked = new LongAdder();
+  LongAdder totalCountBlocked = new LongAdder();
+  LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+synchronized (this) {
+  if (ioException != null) {
+throw new RuntimeException(ioException);
+  }
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  long putStart = System.nanoTime();
+  long putCompleted = System.nanoTime();
+  LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",
+(putCompleted - putStart) / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  // Save the exception so that the calling thread can check if something 
went wrong.
+  // checkState will throw an exception if the read task has failed.
+  synchronized(this) {
+ioException = e;
+  }
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538991#comment-17538991
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130327546

   > @parthchandra One thing that confuses me a bit is that these buffers have 
only ByteBuffer inside them. There's no actual I/O, so it's not possible to 
block. Do you have subclasses that provide some sort of access to real I/O?
   
   Good point. `MultiBufferInputStream` is constructed using buffers that have 
been filled already. `AsyncMultiBufferInputStream` takes an input stream as a 
parameter in the constructor and performs the IO itself. In 
`ByteBufferInputStream` I added
   ```
 public static ByteBufferInputStream wrapAsync(ExecutorService threadPool, 
SeekableInputStream fileInputStream,
   List buffers) {
   return new AsyncMultiBufferInputStream(threadPool, fileInputStream, 
buffers);
 }
   ``` 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538983#comment-17538983
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

kbendick commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876172260


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  final SeekableInputStream fileInputStream;
+  int fetchIndex = 0;
+  int readIndex = 0;
+  ExecutorService threadPool;
+  LinkedBlockingQueue> readFutures;
+  boolean closed = false;
+  Exception ioException;
+
+  LongAdder totalTimeBlocked = new LongAdder();
+  LongAdder totalCountBlocked = new LongAdder();
+  LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+synchronized (this) {
+  if (ioException != null) {
+throw new RuntimeException(ioException);
+  }
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  long putStart = System.nanoTime();
+  long putCompleted = System.nanoTime();
+  LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",
+(putCompleted - putStart) / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  // Save the exception so that the calling thread can check if something 
went wrong.
+  // checkState will throw an exception if the read task has failed.
+  synchronized(this) {
+ioException = e;
+  }
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}

Review Comment:
   Because `checkState` has synchronization, would it be safe to move the 
`checkState` before this somehow or add some kind of less expensive boolean 
check that we can set 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538979#comment-17538979
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

kbendick commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876165378


##
parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java:
##
@@ -37,4 +39,9 @@ public interface PageReader {
* @return the next page in that chunk or null if after the last page
*/
   DataPage readPage();
+
+  /**
+   * Close the page reader. By default it is no-op.
+   */
+  default void close() throws IOException {}

Review Comment:
   Should we add `Closeable` as an implemented interface?





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538969#comment-17538969
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130275378

   @parthchandra One thing that confuses me a bit is that these buffers have 
only ByteBuffer inside them. There's no actual I/O, so it's not possible to 
block. Do you have subclasses that provide some sort of access to real I/O?




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538967#comment-17538967
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130270383

   
   > @parthchandra Would you mind having a look at my I/O performance 
optimization plan for ParquetMR? I think we should coordinate, since we have 
some ideas that might overlap what we touch.
   > 
https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing
   
   @theosib-amazon I read your document and went thru  #960. It looks like for 
the most part, #960 and this PR and complement each other.  The overlap I see 
is in the changes to `MultiBufferInputStream` where you have added the 
`readFully`, and `skipFully` APIs. The bulk of my changes for async IO are in a 
class derived from `MultiBufferInputStream` and the heart of the changes 
depends on overriding `MultiBufferInputStream.nextBuffer`. In 
`MultiBufferInputStream.nextBuffer` the assumption is that all the buffers have 
been read into. In `AsyncMultiBufferInputStream.nextBuffer` this assumption is 
removed and the call *blocks* only if the next required buffer has not been 
read into.
   Now, `skipFully` and `readFully` are potentially blocking calls because both 
call `nextBuffer` repeatedly if necessary. To gain  maximum pipelining, you 
want to make calls to skipFully and readFully such that you never block for too 
long (or at all) in the call. You will get this if you are skipping or reading 
less than the number of bytes in a single buffer. This is generally the case as 
decompression and decoding is at the page level and that is smaller than the 
size of a single buffer. However, for your optimizations, you should be aware 
of this behaviour.
   From what I see, I don't think there will be a conflict.  I'll pull in your 
PR and give it a deeper look. 
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538945#comment-17538945
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130229884

   > Great effort! WIll have a look after the build succeed.
   @shangxinli I have no idea how to get the failed CI to pass. These  failures 
appear to be in unrelated areas caused by some infra issues. Is there a way to 
trigger a rerun? 
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538934#comment-17538934
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

dbtsai commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130214186

   cc @rdblue @gszadovszky @ggershinsky




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538912#comment-17538912
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130176799

   @parthchandra Would you mind having a look at my I/O performance 
optimization plan for ParquetMR? I think we should coordinate, since we have 
some ideas that might overlap what we touch.
   
https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538495#comment-17538495
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1129363998

   I have some numbers from an internal benchmark using Spark. I didn't see any 
benchmarks in the Parquet codebase that I could reuse.
   
   Here are the numbers from my own benchmark - 
   
- 10 runs, each run reads all columns from store_sales (the largest table) 
in the TPC-DS (100G) dataset
 `spark.sql("select * from store_sales")`
 - Sync reader with default 8MB buffer size, Async reader with 1MB buffer 
size (achieves better pipelining)
 - Run on Macbook Pro, reading from S3. Spark has 6 cores.
 - All times in seconds
  
 | Run | Async | Sync | Async (w/o outliers)| Sync (w/o outliers) |
 | ---:| ---:| ---:| ---:| ---:| 
   |1| 84| 102| - | - | 
   |2| 90| 366| 90| 366|
   |3| 78| 156| - | 156|
   |4| 84| 128| 84| - |
   |5| 108|402| - | - | 
   |6| 90| 432| 90| - |
   |7| 84| 378| 84| 378|
   |8| 108|324| - | 324|
   |9| 90| 318| 90| 318|
   |10|90| 282| 90| 282|
   |Average| 90.6|  288.8| 88| 304|
   |Median| 90| 321| **90**| **321**|
   |StdDev| 9.98| 119.
   
   
   After removing the two highest and two lowest runs for each case, and taking 
the median value:
   
   Async: 90 sec
   Sync: 321 sec
   
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538363#comment-17538363
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

dbtsai commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1129148390

   @parthchandra do you have performance benchmark? Thanks




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538331#comment-17538331
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1129106516

   Anyone know why the CI checks are failing with a SocketTimeout exception, 
and what to do to address this? 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537813#comment-17537813
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra opened a new pull request, #968:
URL: https://github.com/apache/parquet-mr/pull/968

   
   
   ### Jira
   
   This PR addresses the following 
[PARQUET-2149](https://issues.apache.org/jira/browse/PARQUET-2149): Implement 
async IO for Parquet file reader
   
   ### Tests
   
   This PR adds the following unit tests 
  AsyncMultiBufferInputStream.*
  TestMultipleWriteRead.testReadWriteAsync 
  TestColumnChunkPageWriteStore.testAsync
  
   The PR is also tested by changing the default configuration to make all 
reads async and then ensuring all unit tests pass
   
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)