On Tue, Jun 13, 2017 at 10:35 AM Sourabh Bajaj <[email protected]>
wrote:

> Thinking more about this, I think we need to consider a few more things:
>
> 1. Using the temporary directory as S3 won't be possible with the dataflow
> runner as the Runner won't have the credentials to clean up the files in
> S3. A work around could be to use a file expiration policy on the bucket in
> S3 to delete the files after 7 days and not do the cleaning in the
> pipeline, and use this bucket in your unload jobs.
>
> 2. The above also brings up another point that is that currently we don't
> have a notion of a finalize step in the pipeline. It is also hard to build
> this manually for some pipeline sinks that return a PDone instead of any
> metadata on what the sink did. This should be a separate thread on the dev
> list.
>

Agree that currently there is no runner/filesystem-independent way to
enforce cleanup. Solutions that enforce a certain structure to the pipeline
graph or cleaning-up after a blocking pipeline execution does not work for
this case since we are talking about enforcing cleanup after a read
transform. I agree that best solution currently might be to enforce some
expiration policy on temporary files.

Regarding your comment "but reading more code made me realize that I should
have used side inputs for that instead", I agree that using a side-input
better here since otherwise you are creating a copy of the column schema
for every record of the menifest file.

Thanks,
Cham


>
> 3. For the point about keys one of the ways to have them available to each
> worker would be to create a package similar to the julia-set example and
> then in the setup.py of that package pull data from KMS and then
> instantiate appropriate environment variables that are needed for access to
> AWS. This is just one way to do this and others might have better ideas.
>
> -Sourabh
>
> On Mon, Jun 12, 2017 at 5:38 PM Dmitry Demeshchuk <[email protected]>
> wrote:
>
>> I really like the idea of a general S3 FileSystem first, sounds like it
>> may also make the dynamic rebalancing work way easier.
>>
>> As for a generic PostgresIO source – yes, that's also in plans, because
>> we rely on regular Postgres as well. In Redshift, using UNLOAD on large
>> datasets is still the recommended approach, and we'd be using Redshift more
>> extensively, so I decided to start with this source. I think these two
>> sources will be complimentary to each other.
>>
>> As for the AWS keys – that's actually another question I wanted to ask:
>> what's the recommended approach to storing and passing around the secrets
>> like access tokens, passwords and such? The plan I originally had was to
>> instead make the Dataflow nodes make the appropriate API calls to Google
>> Cloud Storage and Google KMS. It feels, however, that it's generally safer
>> and easier to reuse when secrets are defined at object configuration time
>> and then just passed in serialized fashion to Dataflow.
>>
>> On Mon, Jun 12, 2017 at 5:23 PM, Sourabh Bajaj <[email protected]>
>> wrote:
>>
>>> Couple of more things to add on what Cham mentioned in his email.
>>>
>>> 1. We currently don't have an implementation for the S3 FileSystem so
>>> adding that might be a good starting point before you implement the unload
>>> load patterns as you won't be able to use the pipeline's temporary
>>> directory as an S3 path without that.
>>>
>>> 2. Another option based on the size of the Redshift tables might be to
>>> directly use a psycopg2 to read the data instead of doing the unload. This
>>> can be a generic PostgresIO in python.
>>>
>>> On a side node: This is going to pass the users AWS keys on the wire to
>>> whatever runner. Might be good to make that optional and see if the machine
>>> IAM role can be used as well as adding a note to warn about this so that
>>> people can create keys with minimal access.
>>>
>>>
>>>
>>> On Mon, Jun 12, 2017 at 5:06 PM Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>>> 1: Using BoundedSource is not an antipattern per se. It is
>>>> *recommended* in case you are able to use the capabilities that it provides
>>>> over a ParDo - otherwise, it's recommended to use ParDo: see
>>>> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
>>>> .
>>>>
>>>> 3: assume that, when you apply two ParDo's in a row to a collection,
>>>> they will typically be fused. Fusion (in Dataflow) is very aggressive and
>>>> will usually fuse things unless it is explicitly impossible (e.g. it's
>>>> impossible to fuse across a GroupByKey, and it also won't fuse across a
>>>> PCollection that gets passed to somebody as a side input).
>>>>
>>>> On Mon, Jun 12, 2017 at 5:02 PM Dmitry Demeshchuk <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Cham,
>>>>>
>>>>> Thanks a lot for the clarifications!
>>>>>
>>>>> (1) I wouldn't mind to use BoundedSource, it's just that my impression
>>>>> was that it was considered to be an anti-pattern. Seems like most of the
>>>>> logic will be left intact though, so shouldn't really be a problem. Is
>>>>> BoundedSource API going to stay mostly the same after the introduction of
>>>>> SplittableDoFn?
>>>>>
>>>>> (2) Makes sense, thanks!
>>>>>
>>>>> (3) Thanks for the tip! This makes me wonder if some sort of test-time
>>>>> fusion detector would be useful, which would take a pipeline, analyze it
>>>>> and print out which transformations may be fused by the runner. But maybe
>>>>> it's just easier to remember that ParDo steps tend to fuse.
>>>>>
>>>>> On a side note, as you may see, I've been putting the columns data
>>>>> into the PCollection (for example, here:
>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72),
>>>>> but reading more code made me realize that I should have used side inputs
>>>>> for that instead?
>>>>>
>>>>> On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Dmitry,
>>>>>>
>>>>>> Thanks for writing this. Some general comments.
>>>>>>
>>>>>> (1) Do you want to implement this using ParDos or using BoundedSource
>>>>>> [1] API. Using BoundedSource API has some benefits such as support for
>>>>>> dynamic work rebalancing (see [2]) though using ParDos will be more
>>>>>> future-proof (dynamic work rebalancing will be supported sometime in the
>>>>>> future through SplittableDoFn API [3][4]).
>>>>>>
>>>>>> (2) Seems like what Java BigQuery source is doing is, deleting the
>>>>>> temporary table at the location you mentioned, and deleting temporary
>>>>>> exported files by mapping the directory path to pipeline's temporary path
>>>>>> (which hopefully gets deleted by the runner). You should be able to 
>>>>>> utilize
>>>>>> a similar approach in Python SDK. You should not delete exported files 
>>>>>> in a
>>>>>> Pardo since a runner might rerun stages of a pipeline.
>>>>>>
>>>>>> (3) If you are using ParDo based approach, you should add a
>>>>>> GroupByKey between the ParDos 'GetS3Files' and 'LoadDataFromS3'. 
>>>>>> Otherwise
>>>>>> all of you ParDos might get fused to a single stage and you might end up
>>>>>> reading all the data from a single worker.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L73
>>>>>> [2]
>>>>>> https://beam.apache.org/documentation/io/authoring-overview/#read-transforms
>>>>>> [3] https://s.apache.org/splittable-do-fn
>>>>>> [4]
>>>>>> https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit#
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi, list,
>>>>>>>
>>>>>>> I was hoping someone could give me a general code review on a
>>>>>>> Redshift source I wrote:
>>>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1.
>>>>>>> It also relies on modules `s3` and `config` from our internal library, I
>>>>>>> can add them too if needed, it just was more hassle to open up the 
>>>>>>> entire
>>>>>>> repository with the code, since it contains some company-specific code 
>>>>>>> at
>>>>>>> the moment.
>>>>>>>
>>>>>>> My hope was also to find out if you wanted me to file a pull
>>>>>>> request, we'd be totally fine to open source this piece, as well as some
>>>>>>> other AWS sources and sinks in the future.
>>>>>>>
>>>>>>> Finally, I have a specific question about cleanup. My impression was
>>>>>>> that
>>>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L153
>>>>>>> would help making sure that there's no possible data loss after we 
>>>>>>> delete
>>>>>>> the S3 files, however, in a personal conversation Eugene Kirpichev 
>>>>>>> pointed
>>>>>>> out that this way does not ensure the PCollection persistence, and that
>>>>>>> Dataflow will just fuse multiple phases together.
>>>>>>>
>>>>>>> Also, Eugene pointed out that this cleanup problem has been worked
>>>>>>> around in the BigQuery source in Java SDK. To my understanding, it's 
>>>>>>> this
>>>>>>> one:
>>>>>>> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L100,
>>>>>>> however I don't yet have enough knowledge about the parity between Java 
>>>>>>> and
>>>>>>> Python SDKs to tell whether I can or cannot implement a Python source 
>>>>>>> in a
>>>>>>> similar fashion (from what I remember, implementing sources is generally
>>>>>>> frowned upon, as opposed to writing a DoFn instead).
>>>>>>>
>>>>>>> Any thoughts and suggestions would be highly appreciated.
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Dmitry Demeshchuk.
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Dmitry Demeshchuk.
>>>>>
>>>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>

Reply via email to