If you are aiming to speed up processing / consumption of batches then
you can use a queue. For example:
# Once per client
pqf = pq.ParquetFile(f, memory_map=True)
# maxsize will control how much you all buffer in RAM
queue = queue.Queue(maxsize=32)
start_workers(queue)
for batch in pqf.iter_batches(self.rows_per_batch, use_pandas_metadata=True):
queue.put(batch)
queue.join()
stop_workers()
# Each worker would do
while not stopped:
next_batch = queue.get()
process_batch(next_batch)
queue.task_done()
On Thu, Feb 3, 2022 at 8:03 AM Cindy McMullen <[email protected]> wrote:
>
> We can't use Beam to parallelize multiple file reads, b/c GraphDataLoader is
> specific to the model being trained. So multiple Beam processes can't share
> the same model (until we move into DGL distributed mode later this year).
>
> We're trying to optimize throughput of the GraphDataLoader consume/process
> these Parquet files.
>
> On Thu, Feb 3, 2022 at 11:01 AM Cindy McMullen <[email protected]> wrote:
>>
>> The use case is for a GraphDataLoader to run w/ multiple threads.
>> GraphDataLoade invokes its DGLDataset, which loads these Parquet files to
>> convert into DGL-compatible graph objects.
>>
>>
>> On Thu, Feb 3, 2022 at 10:00 AM Micah Kornfield <[email protected]>
>> wrote:
>>>
>>> Hi Cindy,
>>>>
>>>> I'd like to ingest batches within a Parquet file in parallel.
>>>
>>> What is the motivation here? Is it speeding up Parquet reading or
>>> processing after the fact?
>>>
>>>
>>> Side note, the size of your row groups seems quite small (it might be right
>>> for your specific use-case).
>>>
>>> Cheers,
>>> Micah
>>>
>>> On Thu, Feb 3, 2022 at 8:01 AM Cindy McMullen <[email protected]> wrote:
>>>>
>>>> Maybe -- will give it a try. Thanks for the suggestion.
>>>>
>>>> On Thu, Feb 3, 2022 at 7:56 AM Partha Dutta <[email protected]> wrote:
>>>>>
>>>>> There is a parameter to iter_batches where you can pass in the row_group
>>>>> number, or a list of row groups. Would this help to read the Parquet file
>>>>> in parallel?
>>>>>
>>>>> On Thu, Feb 3, 2022 at 8:31 AM Cindy McMullen <[email protected]>
>>>>> wrote:
>>>>>>
>>>>>> Hi -
>>>>>>
>>>>>> I'd like to ingest batches within a Parquet file in parallel. The
>>>>>> client (DGLDataset) needs to be thread-safe. What's the best API for me
>>>>>> to use to do so?
>>>>>>
>>>>>> Here's the metadata for one example file:
>>>>>>
>>>>>> <pyarrow._parquet.FileMetaData object at 0x7fbb05c64050>
>>>>>> created_by: parquet-mr version 1.12.0 (build
>>>>>> db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
>>>>>> num_columns: 4
>>>>>> num_rows: 1000000
>>>>>> num_row_groups: 9997
>>>>>> format_version: 1.0
>>>>>> serialized_size: 17824741
>>>>>>
>>>>>> I want the consumption of batches to be distributed among multiple
>>>>>> workers. I'm currently trying something like this:
>>>>>>
>>>>>> # Once per client
>>>>>>
>>>>>> pqf = pq.ParquetFile(f, memory_map=True)
>>>>>>
>>>>>>
>>>>>> # Ideally, each worker can do this, but ParquetFile.iter_batches is not
>>>>>> thread-safe. This makes intuitive sense.
>>>>>> pq_batches = pqf.iter_batches(self.rows_per_batch,
>>>>>> use_pandas_metadata=True)
>>>>>>
>>>>>>
>>>>>>
>>>>>> My workaround is to buffer these ParquetFile batches into DataFrame [],
>>>>>> but this is memory-intensive, so will not scale to multiple of these
>>>>>> input files.
>>>>>>
>>>>>> What's a better PyArrow pattern to use so I can distribute batches to my
>>>>>> workers in a thread-safe manner?
>>>>>>
>>>>>> Thanks --
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Partha Dutta
>>>>> [email protected]