Can you use a combiner[1] for the aggregations?

1: https://beam.apache.org/documentation/programming-guide/#combine

On Thu, Aug 9, 2018 at 11:03 AM Aniruddh Sharma <[email protected]>
wrote:

> Thanks for revert.
>
> In my application i need to do some aggregations, So i will have to store
> contents of file in memory and it will not scale (unless I can keep reading
> it record by record and store it in some intermediary form , not allowed to
> store temporary intermediate unencrypted data on GCS) . So in this case do
> you suggest me to read the channel and write record by record in Pubsub and
> use it as a buffer and build something to consume records from Pubsub.
>
> Thanks
> Aniruddh
>
> On Thu, Aug 9, 2018 at 11:57 AM, Lukasz Cwik <[email protected]> wrote:
>
>> Sharding is runner specific. For example, Dataflow chooses shards at
>> sources (like TextIO/FileIO/AvroIO/...) and at GroupByKey. Dataflow uses a
>> capability exposed in Apache Beam to ask a source to "split" a shard that
>> is currently being processing into smaller shards. Also Dataflow has the
>> ability to concatenate multiple shards creating a larger shard. There is a
>> lot more detail about this in the execution model documentation[1].
>>
>> FileIO doesn't "read" the entire file, it just produces metadata records
>> which contain the name of the file and a few other properties. The ParDo
>> when accessing the FileIO output gets a handle to a ReadableByteChannel.
>> The ParDo that you will write will likely use a decrypting
>> ReadableByteChannel that wraps the ReadableByteChannel provided by the
>> FileIO output. This means that the ParDo will only use an amount of memory
>> approximately equivalent to reading a record plus any overhead that is used
>> in the implementation required to perform decyption and overhead due to
>> buffering the contents of the file. This means that the file size doesn't
>> matter and only the record size within the file matters.
>>
>> 1: https://beam.apache.org/documentation/execution-model/
>>
>>
>>
>> On Thu, Aug 9, 2018 at 6:46 AM Aniruddh Sharma <[email protected]>
>> wrote:
>>
>>> Thanks Lukasz for help. Its very helpful and I understand it now. But
>>> one further question on it.
>>> Lets say using java SDK , I implement this approach ' Your best bet
>>> would be use FileIO[1] followed by a ParDo that accesses the KMS to get the
>>> decryption key and wraps the readable channel with a decryptor. '
>>>
>>> Now how does Beam create shards on functions called within ParDo. Is it
>>> based on some size of data that it assesses when executing inside ParDo and
>>> how are these shards executed against different machines.
>>>
>>> For example
>>> Scenario 1 - File size is 1 MB and I keep default Autoscaling On.  on
>>> execution Beam decides to execute decryption (inside Pardo) on one machine
>>> and it works.
>>> Scenario 2a - File size is 1 GB ( machine is n1-standard-4) and default
>>> Autoscaling is On. It still decides to spin only one machine and goes out
>>> of memory on heap and get itself killed.
>>> Scenario 2b-  File size is 1 GB ( machine is n1-standard-4) and default
>>> Autoscaling is Off and I force it to start with 2 machines. Still it
>>> decides to execute file decrypting logic on one machine and another machine
>>> doesnt do anything and it still gets killed.
>>>
>>> Although I know file is unsplittable (for decryption) and I want in
>>> actual to execute it only on one machine. But because it is in ParDo and it
>>> is is Application logic.  I have not understood in this Context when does
>>> ParDo decide to execute a function in it to execute on different machines.
>>>
>>> For example in Spark - the number of partitions in RDD will get
>>> parallelized and will get executed either in same Executor (in different
>>> wave or with other core) or on different Executors.
>>>
>>> In above example how to make sure that if file size increases ParDo will
>>> not try to scale it. I am new to Beam - if this query doesnt make sense
>>> then apology in advance.
>>>
>>> Thanks
>>> Aniruddh
>>>
>>> On Wed, Aug 8, 2018 at 11:52 AM, Lukasz Cwik <[email protected]> wrote:
>>>
>>>> a) FileIO produces elements which are file descriptors, each descriptor
>>>> represents one file. If you have many files to read then you will get
>>>> parallel processing since multiple files will be read concurrently. You
>>>> could get parallel reading in a single file but you would need to be able
>>>> to split the decrypted file by knowing how to randomly seek in the
>>>> encrypted stream and start reading the decrypted stream without having to
>>>> decrypt everything before that point.
>>>>
>>>> b) You need to add support to the underlying filesystem like S3[1] or
>>>> GCS[2] to support file decryption.
>>>>
>>>> Note for solution a) and b), you can only get efficient parallel
>>>> reading within a single file if you can efficiently seek within the
>>>> "decrypted" stream. Many encryption methods do not allow for this and
>>>> typically require you to decrypt everything up to where you want to read;
>>>> making parallel reading of a single file extremely inefficient. This is a
>>>> common problem for compressed files as well, you can't choose an arbitrary
>>>> spot in the compressed file and start decompressing without decompressing
>>>> everything before it.
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
>>>> 2:
>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
>>>>
>>>> On Tue, Aug 7, 2018 at 1:06 PM Aniruddh Sharma <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks Lukasz for revert.
>>>>> a) Have a further question. I may not be understanding it right. If I
>>>>> embed this logic in ParDo then won't logic be in a way that it limits file
>>>>> read to only one file as directly writing in ParDo  or will it still be a
>>>>> parallel operation .If it is parallel operation then please elaborate 
>>>>> more.
>>>>>
>>>>> b) Is it possible to extend TextIO or AvroIO to decrypt keys (customer
>>>>> supplied) , if yes then request to elaborate how to do it.
>>>>>
>>>>> Thanks
>>>>> Aniruddh
>>>>>
>>>>>
>>>>> Thanks
>>>>> Aniruddh
>>>>>
>>>>> On Mon, Aug 6, 2018 at 8:30 PM, Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> I'm assuming your talking about using the Java SDK. Additional
>>>>>> details about which SDK language, filesystem, and KMS you want to access
>>>>>> would be useful if the approach below doesn't work for you.
>>>>>>
>>>>>> Your best bet would be use FileIO[1] followed by a ParDo that
>>>>>> accesses the KMS to get the decryption key and wraps the readable channel
>>>>>> with a decryptor.
>>>>>> Note that you'll need to apply your own file parsing logic to pull
>>>>>> out the records as you won't be able to use things like AvroIO/TextIO to 
>>>>>> do
>>>>>> the record parsing for you.
>>>>>>
>>>>>> 1:
>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/FileIO.html
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 31, 2018 at 10:27 AM [email protected] <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>> what is the best suggested way to to read a file encrypted with a
>>>>>>> customer key which is also wrapped in KMS
>>>>>>>
>>>>>>
>>>>>
>>>
>

Reply via email to