On Wed, Aug 14, 2019 at 7:19 AM Oliver Laslett <[email protected]> wrote:
> What is the correct way to implement a custom non-splittable file parser > in python? > > My desired end-state is: 1) use Read to pass a file pattern (with wild > cards) pointing to several XML files on remote storage (S3 or GCS). 2) each > file is parsed as a single element (XML cannot be processed line-by-line) > resulting in a PCollection. 3) combine all PCollections together. > > I've subclassed FileBasedSource, which seems to give me everything out of > the box. However I have a problem with zipped files. > The self.open_file(fname) method returns a file object. For non-compressed > files I can call self.open_file(fname).read(). But for compressed files I > have a missing argument error and must provide the number of bytes to read: > self.open_file(fname).read(num_bytes). > > Is it possible to implement a FileBasedSource that works generically for > compressed and non-compressed non-splittable files? > It should be possible. I'm not sure what your issue was though. self.open_file() should return a file-like object (a CompressedFile object if you specified a compression type). In your read_records() implementation, you are expected to read bytes from this file (not all bytes have to be read in a single call) and produce an iterator for reading records. If your files are non splittable though, FileBasedSource does not add much value. I suggest also looking into fileio.MatchAll transform and implementing your source as a composite that uses fileio.MatchAll followed by a ParDo that produces records. You can use Beam's filesystems abstraction in your ParDo to get easy access to all filesystems supported by Beam. Thanks, Cham
