We've actually POC'd a design for this around PubSub, so it sounds like we're all thinking along the same lines. I just wanted to make sure I'd exhausted all possibilities of multi-threaded Parquet file client reads w/ PyArrow. It sounds like I have, and PubSub is the best approach.
On Sat, Feb 5, 2022 at 9:42 PM Micah Kornfield <[email protected]> wrote: > I was going to suggest something similar to Weston's advice if the > bottleneck is downstream processing from Parquet. Looking at the APIs > linked, it's not entirely clear to me how well that would work. > > On Thu, Feb 3, 2022 at 11:58 PM Weston Pace <[email protected]> wrote: > >> If you are aiming to speed up processing / consumption of batches then >> you can use a queue. For example: >> >> # Once per client >> pqf = pq.ParquetFile(f, memory_map=True) >> # maxsize will control how much you all buffer in RAM >> queue = queue.Queue(maxsize=32) >> start_workers(queue) >> for batch in pqf.iter_batches(self.rows_per_batch, >> use_pandas_metadata=True): >> queue.put(batch) >> queue.join() >> stop_workers() >> >> # Each worker would do >> while not stopped: >> next_batch = queue.get() >> process_batch(next_batch) >> queue.task_done() >> >> On Thu, Feb 3, 2022 at 8:03 AM Cindy McMullen <[email protected]> >> wrote: >> > >> > We can't use Beam to parallelize multiple file reads, b/c >> GraphDataLoader is specific to the model being trained. So multiple Beam >> processes can't share the same model (until we move into DGL distributed >> mode later this year). >> > >> > We're trying to optimize throughput of the GraphDataLoader >> consume/process these Parquet files. >> > >> > On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <[email protected]> >> wrote: >> >> >> >> The use case is for a GraphDataLoader to run w/ multiple threads. >> GraphDataLoade invokes its DGLDataset, which loads these Parquet files to >> convert into DGL-compatible graph objects. >> >> >> >> >> >> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <[email protected]> >> wrote: >> >>> >> >>> Hi Cindy, >> >>>> >> >>>> I'd like to ingest batches within a Parquet file in parallel. >> >>> >> >>> What is the motivation here? Is it speeding up Parquet reading or >> processing after the fact? >> >>> >> >>> >> >>> Side note, the size of your row groups seems quite small (it might be >> right for your specific use-case). >> >>> >> >>> Cheers, >> >>> Micah >> >>> >> >>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <[email protected]> >> wrote: >> >>>> >> >>>> Maybe -- will give it a try. Thanks for the suggestion. >> >>>> >> >>>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <[email protected]> >> wrote: >> >>>>> >> >>>>> There is a parameter to iter_batches where you can pass in the >> row_group number, or a list of row groups. Would this help to read the >> Parquet file in parallel? >> >>>>> >> >>>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen < >> [email protected]> wrote: >> >>>>>> >> >>>>>> Hi - >> >>>>>> >> >>>>>> I'd like to ingest batches within a Parquet file in parallel. >> The client (DGLDataset) needs to be thread-safe. What's the best API for >> me to use to do so? >> >>>>>> >> >>>>>> Here's the metadata for one example file: >> >>>>>> >> >>>>>> <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050> >> >>>>>> created_by: parquet-mr version 1.12.0 (build >> db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20) >> >>>>>> num_columns: 4 >> >>>>>> num_rows: 1000000 >> >>>>>> num_row_groups: 9997 >> >>>>>> format_version: 1.0 >> >>>>>> serialized_size: 17824741 >> >>>>>> >> >>>>>> I want the consumption of batches to be distributed among multiple >> workers. I'm currently trying something like this: >> >>>>>> >> >>>>>> # Once per client >> >>>>>> >> >>>>>> pqf = pq.ParquetFile(f, memory_map=True) >> >>>>>> >> >>>>>> >> >>>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches >> is not thread-safe. This makes intuitive sense. >> >>>>>> pq_batches = pqf.iter_batches(self.rows_per_batch, >> use_pandas_metadata=True) >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> My workaround is to buffer these ParquetFile batches into >> DataFrame [], but this is memory-intensive, so will not scale to multiple >> of these input files. >> >>>>>> >> >>>>>> What's a better PyArrow pattern to use so I can distribute batches >> to my workers in a thread-safe manner? >> >>>>>> >> >>>>>> Thanks -- >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>> >> >>>>> >> >>>>> -- >> >>>>> Partha Dutta >> >>>>> [email protected] >> >
