If you are aiming to speed up processing / consumption of batches then
you can use a queue.  For example:

# Once per client
pqf = pq.ParquetFile(f, memory_map=True)
# maxsize will control how much you all buffer in RAM
queue = queue.Queue(maxsize=32)
start_workers(queue)
for batch in pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True):
  queue.put(batch)
queue.join()
stop_workers()

# Each worker would do
while not stopped:
    next_batch = queue.get()
    process_batch(next_batch)
    queue.task_done()

On Thu, Feb 3, 2022 at 8:03 AM Cindy McMullen <[email protected]> wrote:
>
> We can't use Beam to parallelize multiple file reads, b/c GraphDataLoader is 
> specific to the model being trained.  So multiple Beam processes can't share 
> the same model (until we move into DGL distributed mode later this year).
>
> We're trying to optimize throughput of the GraphDataLoader consume/process 
> these Parquet files.
>
> On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <[email protected]> wrote:
>>
>> The use case is for a GraphDataLoader to run w/ multiple threads.  
>> GraphDataLoade invokes its DGLDataset, which loads these Parquet files to 
>> convert into DGL-compatible graph objects.
>>
>>
>> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <[email protected]> 
>> wrote:
>>>
>>> Hi Cindy,
>>>>
>>>> I'd like to ingest  batches within a Parquet file in parallel.
>>>
>>> What is the motivation here?  Is it speeding up Parquet reading or 
>>> processing after the fact?
>>>
>>>
>>> Side note, the size of your row groups seems quite small (it might be right 
>>> for your specific use-case).
>>>
>>> Cheers,
>>> Micah
>>>
>>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <[email protected]> wrote:
>>>>
>>>> Maybe -- will give it a try.  Thanks for the suggestion.
>>>>
>>>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <[email protected]> wrote:
>>>>>
>>>>> There is a parameter to iter_batches where you can pass in the row_group 
>>>>> number, or a list of row groups. Would this help to read the Parquet file 
>>>>> in parallel?
>>>>>
>>>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <[email protected]> 
>>>>> wrote:
>>>>>>
>>>>>> Hi -
>>>>>>
>>>>>> I'd like to ingest  batches within a Parquet file in parallel.  The 
>>>>>> client (DGLDataset) needs to be thread-safe.  What's the best API for me 
>>>>>> to use  to do so?
>>>>>>
>>>>>> Here's the metadata for one example file:
>>>>>>
>>>>>>   <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>>>>>>   created_by: parquet-mr version 1.12.0 (build 
>>>>>> db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>>>>>>   num_columns: 4
>>>>>>   num_rows: 1000000
>>>>>>   num_row_groups: 9997
>>>>>>   format_version: 1.0
>>>>>>   serialized_size: 17824741
>>>>>>
>>>>>> I want the consumption of batches to be distributed among multiple 
>>>>>> workers.  I'm currently trying something like this:
>>>>>>
>>>>>> # Once per client
>>>>>>
>>>>>> pqf = pq.ParquetFile(f, memory_map=True)
>>>>>>
>>>>>>
>>>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches is not 
>>>>>> thread-safe.  This makes intuitive sense.
>>>>>> pq_batches = pqf.iter_batches(self.rows_per_batch, 
>>>>>> use_pandas_metadata=True)
>>>>>>
>>>>>>
>>>>>>
>>>>>> My workaround is to buffer these ParquetFile batches into DataFrame [], 
>>>>>> but this is memory-intensive, so will not scale to multiple of these 
>>>>>> input files.
>>>>>>
>>>>>> What's a better PyArrow pattern to use so I can distribute batches to my 
>>>>>> workers in a thread-safe manner?
>>>>>>
>>>>>> Thanks --
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Partha Dutta
>>>>> [email protected]

Reply via email to