I was going to suggest something similar to Weston's advice if the
bottleneck is downstream processing from Parquet.   Looking at the APIs
linked, it's not entirely clear to me how well that would work.

On Thu, Feb 3, 2022 at 11:58 PM Weston Pace <[email protected]> wrote:

> If you are aiming to speed up processing / consumption of batches then
> you can use a queue.  For example:
>
> # Once per client
> pqf = pq.ParquetFile(f, memory_map=True)
> # maxsize will control how much you all buffer in RAM
> queue = queue.Queue(maxsize=32)
> start_workers(queue)
> for batch in pqf.iter_batches(self.rows_per_batch,
> use_pandas_metadata=True):
>   queue.put(batch)
> queue.join()
> stop_workers()
>
> # Each worker would do
> while not stopped:
>     next_batch = queue.get()
>     process_batch(next_batch)
>     queue.task_done()
>
> On Thu, Feb 3, 2022 at 8:03 AM Cindy McMullen <[email protected]>
> wrote:
> >
> > We can't use Beam to parallelize multiple file reads, b/c
> GraphDataLoader is specific to the model being trained.  So multiple Beam
> processes can't share the same model (until we move into DGL distributed
> mode later this year).
> >
> > We're trying to optimize throughput of the GraphDataLoader
> consume/process these Parquet files.
> >
> > On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <[email protected]>
> wrote:
> >>
> >> The use case is for a GraphDataLoader to run w/ multiple threads.
> GraphDataLoade invokes its DGLDataset, which loads these Parquet files to
> convert into DGL-compatible graph objects.
> >>
> >>
> >> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <[email protected]>
> wrote:
> >>>
> >>> Hi Cindy,
> >>>>
> >>>> I'd like to ingest  batches within a Parquet file in parallel.
> >>>
> >>> What is the motivation here?  Is it speeding up Parquet reading or
> processing after the fact?
> >>>
> >>>
> >>> Side note, the size of your row groups seems quite small (it might be
> right for your specific use-case).
> >>>
> >>> Cheers,
> >>> Micah
> >>>
> >>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <[email protected]>
> wrote:
> >>>>
> >>>> Maybe -- will give it a try.  Thanks for the suggestion.
> >>>>
> >>>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <[email protected]>
> wrote:
> >>>>>
> >>>>> There is a parameter to iter_batches where you can pass in the
> row_group number, or a list of row groups. Would this help to read the
> Parquet file in parallel?
> >>>>>
> >>>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <[email protected]>
> wrote:
> >>>>>>
> >>>>>> Hi -
> >>>>>>
> >>>>>> I'd like to ingest  batches within a Parquet file in parallel.  The
> client (DGLDataset) needs to be thread-safe.  What's the best API for me to
> use  to do so?
> >>>>>>
> >>>>>> Here's the metadata for one example file:
> >>>>>>
> >>>>>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
> >>>>>>   created_by: parquet-mr version 1.12.0 (build
> db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
> >>>>>>   num_columns: 4
> >>>>>>   num_rows: 1000000
> >>>>>>   num_row_groups: 9997
> >>>>>>   format_version: 1.0
> >>>>>>   serialized_size: 17824741
> >>>>>>
> >>>>>> I want the consumption of batches to be distributed among multiple
> workers.  I'm currently trying something like this:
> >>>>>>
> >>>>>> # Once per client
> >>>>>>
> >>>>>> pqf = pq.ParquetFile(f, memory_map=True)
> >>>>>>
> >>>>>>
> >>>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches is
> not thread-safe.  This makes intuitive sense.
> >>>>>> pq_batches = pqf.iter_batches(self.rows_per_batch,
> use_pandas_metadata=True)
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> My workaround is to buffer these ParquetFile batches into DataFrame
> [], but this is memory-intensive, so will not scale to multiple of these
> input files.
> >>>>>>
> >>>>>> What's a better PyArrow pattern to use so I can distribute batches
> to my workers in a thread-safe manner?
> >>>>>>
> >>>>>> Thanks --
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Partha Dutta
> >>>>> [email protected]
>

Reply via email to