Hi there, I have a python pipeline that reads a PCollection of zip/tar file paths from BigQuery, makes them readable via applying ReadMatches, and then unzips the contents inside a ParDo.
input # this is a PCollection of GCS paths of zip files | "Load ZIP archives" >> fileio.ReadMatches(skip_directories=True) | "Decompress zip archives" >> beam.ParDo(decompress) # decompress is the method that loops through the contents of the zip file and adds to emitted output Since the zip files are of variable sizes, and can contain arbitrary number of smaller files, some instances of the decompress method can take a pretty long time while some others will finish relatively early, resulting in an uneven load of computation. I am thinking of tackling this via converting the decompress to a SplittableDoFn. The basic structure of the decompressor looks like this: with ZipFile(archive) as compressed_data: for member in compressed_data.infolist(): try: decompressed_file = compressed_data.open(member) raw_contents = decompressed_file.read() output.append(raw_contents) finally: decompressed_file.close() I need some advice on this and have a few questions: - Does SplittableDoFn seem a reasonable approach here? - The plan I have in mind is to possibly write my own RestrictionTracker (inheriting ioBase.RestrictionTracker) and RestrictionProvider, where the provider would just yield the next file (member) of the compressed file. Does that sound like a reasonable approach? - Is it possible to share some code snippet that can be helpful? I didn't find too many examples for SDF in Python Any questions - don't hesitate to ask. Thanks in adavnce. - Asif -- Asif Iqbal Senior Software Engineer *BenchSci* *www.benchsci.com <http://www.benchsci.com>* *E: *aiq...@benchsci.com Did you know that $48 billion is lost to Avoidable Experiment Expenditure every year? Read our latest whitepaper <https://hubs.ly/H0rB8W50> to learn more.