Hi there,

I have a python pipeline that reads a PCollection of zip/tar file paths
from BigQuery, makes them readable via applying ReadMatches, and then
unzips the contents inside a ParDo.

     input # this is a PCollection of GCS paths of zip files
      | "Load ZIP archives" >> fileio.ReadMatches(skip_directories=True)
      | "Decompress zip archives" >> beam.ParDo(decompress) # decompress is
the method that loops through the contents of the zip file and adds to
emitted output

Since the zip files are of variable sizes, and can contain arbitrary number
of smaller files, some instances of the decompress method can take a pretty
long time while some others will finish relatively early, resulting in an
uneven load of computation. I am thinking of tackling this via converting
the decompress to a SplittableDoFn.

The basic structure of the decompressor looks like this:
with ZipFile(archive) as compressed_data:
    for member in compressed_data.infolist():
        try:
            decompressed_file = compressed_data.open(member)
            raw_contents = decompressed_file.read()
            output.append(raw_contents)
        finally:
             decompressed_file.close()

I need some advice on this and have a few questions:

- Does SplittableDoFn seem a reasonable approach here?
- The plan I have in mind is to possibly write my own RestrictionTracker
(inheriting ioBase.RestrictionTracker) and RestrictionProvider, where the
provider would just yield the next
file (member) of the compressed file. Does that sound like a reasonable
approach?
- Is it possible to share some code snippet that can be helpful? I didn't
find too many examples for SDF in Python

Any questions - don't hesitate to ask. Thanks in adavnce.

- Asif

-- 
Asif Iqbal

Senior Software Engineer
*BenchSci*
*www.benchsci.com <http://www.benchsci.com>*
*E: *aiq...@benchsci.com
Did you know that $48 billion is lost to Avoidable Experiment Expenditure
every year? Read our latest whitepaper <https://hubs.ly/H0rB8W50> to learn
more.

Reply via email to