You'll want to extend FileBasedSource and pass in the fact that the file is not splittable and that the compression type is CompressionTypes.UNCOMPRESSED https://github.com/apache/beam/blob/59598d8f41e65f9a068d7446457395e112dc3bc7/sdks/python/apache_beam/io/filebasedsource.py
You'll want to overload the "open_file" method to wrap the stream being returned so that the decompression/decryption occurs. On Tue, Jun 20, 2017 at 11:12 AM, Sachin Shetty <[email protected]> wrote: > Thankyou Lukasz for the link, I will try to build my custom source on the > same lines. > > Any pointers on how we can do this in python? > > > > On Tue, Jun 20, 2017 at 8:29 PM, Lukasz Cwik <[email protected]> wrote: > >> Take a look at CompressedSource: >> https://github.com/apache/beam/blob/master/sdks/java/core/ >> src/main/java/org/apache/beam/sdk/io/CompressedSource.java >> >> I feel as though you could follow the same pattern to decompress/decrypt >> the data as a wrapper. >> >> Apache Beam supports a concept of dynamic work rebalancing ( >> https://beam.apache.org/blog/2016/05/18/splitAtFraction-method.html) >> which allows a runner to utilize its fleet of machines more efficiently. >> For file based sources, this would rely on being able to partition each >> input file into smaller pieces for parallel processing. Unless that >> compression/encryption method supports efficient seeking within the >> uncompressed data, the smallest granularity of work rebalancing you will >> have is at the whole file level (since decompressing/decrypting from the >> beginning of the file to read an arbitrary offset is usually very >> inefficient). >> >> On Tue, Jun 20, 2017 at 6:00 AM, Sachin Shetty <[email protected]> >> wrote: >> >>> Hi, >>> >>> I am trying to process some of our access logs using beam. Log files are >>> archived to a GCS bucket, but they are lz compressed and encrypted using >>> gpg. >>> >>> Any idea how I could load up the files in to a pipeline without >>> decrypting, decompressing and staging the file before feeding it to a beam >>> pipeline? >>> >>> I see that I could write a custom coder, but I could not figure our the >>> specifics. >>> >>> Thanks >>> Sachin >>> >> >> >
