On Sun, Aug 18, 2019 at 10:45 AM Oliver Laslett <[email protected]> wrote:
> Hi Cham, > > That's really helpful thank you. I think fileio.MatchAll is basically what > I needed FileBasedSource for. > > What did you mean by "use Beam's filesystems abstraction in your ParDo"? > If you want to connect to all file-systems supported by Beam (currently GCS, HDFS, and local) in an abstract way, you can use the filesystems API: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py Thanks, Cham > > Cheers > Oliver > > On Fri, Aug 16, 2019 at 2:32 AM Chamikara Jayalath <[email protected]> > wrote: > >> >> >> On Wed, Aug 14, 2019 at 7:19 AM Oliver Laslett <[email protected]> wrote: >> >>> What is the correct way to implement a custom non-splittable file parser >>> in python? >>> >>> My desired end-state is: 1) use Read to pass a file pattern (with wild >>> cards) pointing to several XML files on remote storage (S3 or GCS). 2) each >>> file is parsed as a single element (XML cannot be processed line-by-line) >>> resulting in a PCollection. 3) combine all PCollections together. >>> >>> I've subclassed FileBasedSource, which seems to give me everything out >>> of the box. However I have a problem with zipped files. >>> The self.open_file(fname) method returns a file object. For >>> non-compressed files I can call self.open_file(fname).read(). But for >>> compressed files I have a missing argument error and must provide the >>> number of bytes to read: self.open_file(fname).read(num_bytes). >>> >>> Is it possible to implement a FileBasedSource that works generically for >>> compressed and non-compressed non-splittable files? >>> >> >> It should be possible. I'm not sure what your issue was >> though. self.open_file() should return a file-like object (a CompressedFile >> object if you specified a compression type). In your read_records() >> implementation, you are expected to read bytes from this file (not all >> bytes have to be read in a single call) and produce an iterator for reading >> records. >> >> If your files are non splittable though, FileBasedSource does not add >> much value. I suggest also looking into fileio.MatchAll transform and >> implementing your source as a composite that uses fileio.MatchAll followed >> by a ParDo that produces records. You can use Beam's filesystems >> abstraction in your ParDo to get easy access to all filesystems supported >> by Beam. >> >> Thanks, >> Cham >> >> > > > -- > > Oliver Laslett > > Machine Learning Scientist | Cytora > > We're hiring! <http://www.cytora.com/careers> > > W: www.cytora.com > > 9 Dallington Street | London, EC1V 0LN > > This email is confidential and intended for the use of the addressee only. > If you receive this email in error, please accept our apology and delete it > immediately. Please inform us if you have received this email in error. >
