Sharding is runner specific. For example, Dataflow chooses shards at
sources (like TextIO/FileIO/AvroIO/...) and at GroupByKey. Dataflow uses a
capability exposed in Apache Beam to ask a source to "split" a shard that
is currently being processing into smaller shards. Also Dataflow has the
ability to concatenate multiple shards creating a larger shard. There is a
lot more detail about this in the execution model documentation[1].

FileIO doesn't "read" the entire file, it just produces metadata records
which contain the name of the file and a few other properties. The ParDo
when accessing the FileIO output gets a handle to a ReadableByteChannel.
The ParDo that you will write will likely use a decrypting
ReadableByteChannel that wraps the ReadableByteChannel provided by the
FileIO output. This means that the ParDo will only use an amount of memory
approximately equivalent to reading a record plus any overhead that is used
in the implementation required to perform decyption and overhead due to
buffering the contents of the file. This means that the file size doesn't
matter and only the record size within the file matters.

1: https://beam.apache.org/documentation/execution-model/



On Thu, Aug 9, 2018 at 6:46 AM Aniruddh Sharma <[email protected]> wrote:

> Thanks Lukasz for help. Its very helpful and I understand it now. But one
> further question on it.
> Lets say using java SDK , I implement this approach ' Your best bet would
> be use FileIO[1] followed by a ParDo that accesses the KMS to get the
> decryption key and wraps the readable channel with a decryptor. '
>
> Now how does Beam create shards on functions called within ParDo. Is it
> based on some size of data that it assesses when executing inside ParDo and
> how are these shards executed against different machines.
>
> For example
> Scenario 1 - File size is 1 MB and I keep default Autoscaling On.  on
> execution Beam decides to execute decryption (inside Pardo) on one machine
> and it works.
> Scenario 2a - File size is 1 GB ( machine is n1-standard-4) and default
> Autoscaling is On. It still decides to spin only one machine and goes out
> of memory on heap and get itself killed.
> Scenario 2b-  File size is 1 GB ( machine is n1-standard-4) and default
> Autoscaling is Off and I force it to start with 2 machines. Still it
> decides to execute file decrypting logic on one machine and another machine
> doesnt do anything and it still gets killed.
>
> Although I know file is unsplittable (for decryption) and I want in actual
> to execute it only on one machine. But because it is in ParDo and it is is
> Application logic.  I have not understood in this Context when does ParDo
> decide to execute a function in it to execute on different machines.
>
> For example in Spark - the number of partitions in RDD will get
> parallelized and will get executed either in same Executor (in different
> wave or with other core) or on different Executors.
>
> In above example how to make sure that if file size increases ParDo will
> not try to scale it. I am new to Beam - if this query doesnt make sense
> then apology in advance.
>
> Thanks
> Aniruddh
>
> On Wed, Aug 8, 2018 at 11:52 AM, Lukasz Cwik <[email protected]> wrote:
>
>> a) FileIO produces elements which are file descriptors, each descriptor
>> represents one file. If you have many files to read then you will get
>> parallel processing since multiple files will be read concurrently. You
>> could get parallel reading in a single file but you would need to be able
>> to split the decrypted file by knowing how to randomly seek in the
>> encrypted stream and start reading the decrypted stream without having to
>> decrypt everything before that point.
>>
>> b) You need to add support to the underlying filesystem like S3[1] or
>> GCS[2] to support file decryption.
>>
>> Note for solution a) and b), you can only get efficient parallel reading
>> within a single file if you can efficiently seek within the "decrypted"
>> stream. Many encryption methods do not allow for this and typically require
>> you to decrypt everything up to where you want to read; making parallel
>> reading of a single file extremely inefficient. This is a common problem
>> for compressed files as well, you can't choose an arbitrary spot in the
>> compressed file and start decompressing without decompressing everything
>> before it.
>>
>> 1:
>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
>> 2:
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
>>
>> On Tue, Aug 7, 2018 at 1:06 PM Aniruddh Sharma <[email protected]>
>> wrote:
>>
>>> Thanks Lukasz for revert.
>>> a) Have a further question. I may not be understanding it right. If I
>>> embed this logic in ParDo then won't logic be in a way that it limits file
>>> read to only one file as directly writing in ParDo  or will it still be a
>>> parallel operation .If it is parallel operation then please elaborate more.
>>>
>>> b) Is it possible to extend TextIO or AvroIO to decrypt keys (customer
>>> supplied) , if yes then request to elaborate how to do it.
>>>
>>> Thanks
>>> Aniruddh
>>>
>>>
>>> Thanks
>>> Aniruddh
>>>
>>> On Mon, Aug 6, 2018 at 8:30 PM, Lukasz Cwik <[email protected]> wrote:
>>>
>>>> I'm assuming your talking about using the Java SDK. Additional details
>>>> about which SDK language, filesystem, and KMS you want to access would be
>>>> useful if the approach below doesn't work for you.
>>>>
>>>> Your best bet would be use FileIO[1] followed by a ParDo that accesses
>>>> the KMS to get the decryption key and wraps the readable channel with a
>>>> decryptor.
>>>> Note that you'll need to apply your own file parsing logic to pull out
>>>> the records as you won't be able to use things like AvroIO/TextIO to do the
>>>> record parsing for you.
>>>>
>>>> 1:
>>>> https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/FileIO.html
>>>>
>>>>
>>>>
>>>> On Tue, Jul 31, 2018 at 10:27 AM [email protected] <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>> what is the best suggested way to to read a file encrypted with a
>>>>> customer key which is also wrapped in KMS
>>>>>
>>>>
>>>
>

Reply via email to