Can you use a combiner[1] for the aggregations? 1: https://beam.apache.org/documentation/programming-guide/#combine
On Thu, Aug 9, 2018 at 11:03 AM Aniruddh Sharma <[email protected]> wrote: > Thanks for revert. > > In my application i need to do some aggregations, So i will have to store > contents of file in memory and it will not scale (unless I can keep reading > it record by record and store it in some intermediary form , not allowed to > store temporary intermediate unencrypted data on GCS) . So in this case do > you suggest me to read the channel and write record by record in Pubsub and > use it as a buffer and build something to consume records from Pubsub. > > Thanks > Aniruddh > > On Thu, Aug 9, 2018 at 11:57 AM, Lukasz Cwik <[email protected]> wrote: > >> Sharding is runner specific. For example, Dataflow chooses shards at >> sources (like TextIO/FileIO/AvroIO/...) and at GroupByKey. Dataflow uses a >> capability exposed in Apache Beam to ask a source to "split" a shard that >> is currently being processing into smaller shards. Also Dataflow has the >> ability to concatenate multiple shards creating a larger shard. There is a >> lot more detail about this in the execution model documentation[1]. >> >> FileIO doesn't "read" the entire file, it just produces metadata records >> which contain the name of the file and a few other properties. The ParDo >> when accessing the FileIO output gets a handle to a ReadableByteChannel. >> The ParDo that you will write will likely use a decrypting >> ReadableByteChannel that wraps the ReadableByteChannel provided by the >> FileIO output. This means that the ParDo will only use an amount of memory >> approximately equivalent to reading a record plus any overhead that is used >> in the implementation required to perform decyption and overhead due to >> buffering the contents of the file. This means that the file size doesn't >> matter and only the record size within the file matters. >> >> 1: https://beam.apache.org/documentation/execution-model/ >> >> >> >> On Thu, Aug 9, 2018 at 6:46 AM Aniruddh Sharma <[email protected]> >> wrote: >> >>> Thanks Lukasz for help. Its very helpful and I understand it now. But >>> one further question on it. >>> Lets say using java SDK , I implement this approach ' Your best bet >>> would be use FileIO[1] followed by a ParDo that accesses the KMS to get the >>> decryption key and wraps the readable channel with a decryptor. ' >>> >>> Now how does Beam create shards on functions called within ParDo. Is it >>> based on some size of data that it assesses when executing inside ParDo and >>> how are these shards executed against different machines. >>> >>> For example >>> Scenario 1 - File size is 1 MB and I keep default Autoscaling On. on >>> execution Beam decides to execute decryption (inside Pardo) on one machine >>> and it works. >>> Scenario 2a - File size is 1 GB ( machine is n1-standard-4) and default >>> Autoscaling is On. It still decides to spin only one machine and goes out >>> of memory on heap and get itself killed. >>> Scenario 2b- File size is 1 GB ( machine is n1-standard-4) and default >>> Autoscaling is Off and I force it to start with 2 machines. Still it >>> decides to execute file decrypting logic on one machine and another machine >>> doesnt do anything and it still gets killed. >>> >>> Although I know file is unsplittable (for decryption) and I want in >>> actual to execute it only on one machine. But because it is in ParDo and it >>> is is Application logic. I have not understood in this Context when does >>> ParDo decide to execute a function in it to execute on different machines. >>> >>> For example in Spark - the number of partitions in RDD will get >>> parallelized and will get executed either in same Executor (in different >>> wave or with other core) or on different Executors. >>> >>> In above example how to make sure that if file size increases ParDo will >>> not try to scale it. I am new to Beam - if this query doesnt make sense >>> then apology in advance. >>> >>> Thanks >>> Aniruddh >>> >>> On Wed, Aug 8, 2018 at 11:52 AM, Lukasz Cwik <[email protected]> wrote: >>> >>>> a) FileIO produces elements which are file descriptors, each descriptor >>>> represents one file. If you have many files to read then you will get >>>> parallel processing since multiple files will be read concurrently. You >>>> could get parallel reading in a single file but you would need to be able >>>> to split the decrypted file by knowing how to randomly seek in the >>>> encrypted stream and start reading the decrypted stream without having to >>>> decrypt everything before that point. >>>> >>>> b) You need to add support to the underlying filesystem like S3[1] or >>>> GCS[2] to support file decryption. >>>> >>>> Note for solution a) and b), you can only get efficient parallel >>>> reading within a single file if you can efficiently seek within the >>>> "decrypted" stream. Many encryption methods do not allow for this and >>>> typically require you to decrypt everything up to where you want to read; >>>> making parallel reading of a single file extremely inefficient. This is a >>>> common problem for compressed files as well, you can't choose an arbitrary >>>> spot in the compressed file and start decompressing without decompressing >>>> everything before it. >>>> >>>> 1: >>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java >>>> 2: >>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java >>>> >>>> On Tue, Aug 7, 2018 at 1:06 PM Aniruddh Sharma <[email protected]> >>>> wrote: >>>> >>>>> Thanks Lukasz for revert. >>>>> a) Have a further question. I may not be understanding it right. If I >>>>> embed this logic in ParDo then won't logic be in a way that it limits file >>>>> read to only one file as directly writing in ParDo or will it still be a >>>>> parallel operation .If it is parallel operation then please elaborate >>>>> more. >>>>> >>>>> b) Is it possible to extend TextIO or AvroIO to decrypt keys (customer >>>>> supplied) , if yes then request to elaborate how to do it. >>>>> >>>>> Thanks >>>>> Aniruddh >>>>> >>>>> >>>>> Thanks >>>>> Aniruddh >>>>> >>>>> On Mon, Aug 6, 2018 at 8:30 PM, Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> I'm assuming your talking about using the Java SDK. Additional >>>>>> details about which SDK language, filesystem, and KMS you want to access >>>>>> would be useful if the approach below doesn't work for you. >>>>>> >>>>>> Your best bet would be use FileIO[1] followed by a ParDo that >>>>>> accesses the KMS to get the decryption key and wraps the readable channel >>>>>> with a decryptor. >>>>>> Note that you'll need to apply your own file parsing logic to pull >>>>>> out the records as you won't be able to use things like AvroIO/TextIO to >>>>>> do >>>>>> the record parsing for you. >>>>>> >>>>>> 1: >>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/FileIO.html >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jul 31, 2018 at 10:27 AM [email protected] < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> what is the best suggested way to to read a file encrypted with a >>>>>>> customer key which is also wrapped in KMS >>>>>>> >>>>>> >>>>> >>> >
