We've actually POC'd a design for this around PubSub, so it sounds like
we're all thinking along the same lines.
I just wanted to make sure I'd exhausted all possibilities of
multi-threaded Parquet file client reads w/ PyArrow. It sounds like I have,
and PubSub is the best approach.

On Sat, Feb 5, 2022 at 9:42 PM Micah Kornfield <[email protected]>
wrote:

> I was going to suggest something similar to Weston's advice if the
> bottleneck is downstream processing from Parquet.   Looking at the APIs
> linked, it's not entirely clear to me how well that would work.
>
> On Thu, Feb 3, 2022 at 11:58 PM Weston Pace <[email protected]> wrote:
>
>> If you are aiming to speed up processing / consumption of batches then
>> you can use a queue.  For example:
>>
>> # Once per client
>> pqf = pq.ParquetFile(f, memory_map=True)
>> # maxsize will control how much you all buffer in RAM
>> queue = queue.Queue(maxsize=32)
>> start_workers(queue)
>> for batch in pqf.iter_batches(self.rows_per_batch,
>> use_pandas_metadata=True):
>>   queue.put(batch)
>> queue.join()
>> stop_workers()
>>
>> # Each worker would do
>> while not stopped:
>>     next_batch = queue.get()
>>     process_batch(next_batch)
>>     queue.task_done()
>>
>> On Thu, Feb 3, 2022 at 8:03 AM Cindy McMullen <[email protected]>
>> wrote:
>> >
>> > We can't use Beam to parallelize multiple file reads, b/c
>> GraphDataLoader is specific to the model being trained.  So multiple Beam
>> processes can't share the same model (until we move into DGL distributed
>> mode later this year).
>> >
>> > We're trying to optimize throughput of the GraphDataLoader
>> consume/process these Parquet files.
>> >
>> > On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <[email protected]>
>> wrote:
>> >>
>> >> The use case is for a GraphDataLoader to run w/ multiple threads.
>> GraphDataLoade invokes its DGLDataset, which loads these Parquet files to
>> convert into DGL-compatible graph objects.
>> >>
>> >>
>> >> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <[email protected]>
>> wrote:
>> >>>
>> >>> Hi Cindy,
>> >>>>
>> >>>> I'd like to ingest  batches within a Parquet file in parallel.
>> >>>
>> >>> What is the motivation here?  Is it speeding up Parquet reading or
>> processing after the fact?
>> >>>
>> >>>
>> >>> Side note, the size of your row groups seems quite small (it might be
>> right for your specific use-case).
>> >>>
>> >>> Cheers,
>> >>> Micah
>> >>>
>> >>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <[email protected]>
>> wrote:
>> >>>>
>> >>>> Maybe -- will give it a try.  Thanks for the suggestion.
>> >>>>
>> >>>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <[email protected]>
>> wrote:
>> >>>>>
>> >>>>> There is a parameter to iter_batches where you can pass in the
>> row_group number, or a list of row groups. Would this help to read the
>> Parquet file in parallel?
>> >>>>>
>> >>>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <
>> [email protected]> wrote:
>> >>>>>>
>> >>>>>> Hi -
>> >>>>>>
>> >>>>>> I'd like to ingest  batches within a Parquet file in parallel.
>> The client (DGLDataset) needs to be thread-safe.  What's the best API for
>> me to use  to do so?
>> >>>>>>
>> >>>>>> Here's the metadata for one example file:
>> >>>>>>
>> >>>>>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>> >>>>>>   created_by: parquet-mr version 1.12.0 (build
>> db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>> >>>>>>   num_columns: 4
>> >>>>>>   num_rows: 1000000
>> >>>>>>   num_row_groups: 9997
>> >>>>>>   format_version: 1.0
>> >>>>>>   serialized_size: 17824741
>> >>>>>>
>> >>>>>> I want the consumption of batches to be distributed among multiple
>> workers.  I'm currently trying something like this:
>> >>>>>>
>> >>>>>> # Once per client
>> >>>>>>
>> >>>>>> pqf = pq.ParquetFile(f, memory_map=True)
>> >>>>>>
>> >>>>>>
>> >>>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches
>> is not thread-safe.  This makes intuitive sense.
>> >>>>>> pq_batches = pqf.iter_batches(self.rows_per_batch,
>> use_pandas_metadata=True)
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> My workaround is to buffer these ParquetFile batches into
>> DataFrame [], but this is memory-intensive, so will not scale to multiple
>> of these input files.
>> >>>>>>
>> >>>>>> What's a better PyArrow pattern to use so I can distribute batches
>> to my workers in a thread-safe manner?
>> >>>>>>
>> >>>>>> Thanks --
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>> --
>> >>>>> Partha Dutta
>> >>>>> [email protected]
>>
>

Reply via email to