On Tue, Jun 13, 2017 at 10:35 AM Sourabh Bajaj <[email protected]> wrote:
> Thinking more about this, I think we need to consider a few more things: > > 1. Using the temporary directory as S3 won't be possible with the dataflow > runner as the Runner won't have the credentials to clean up the files in > S3. A work around could be to use a file expiration policy on the bucket in > S3 to delete the files after 7 days and not do the cleaning in the > pipeline, and use this bucket in your unload jobs. > > 2. The above also brings up another point that is that currently we don't > have a notion of a finalize step in the pipeline. It is also hard to build > this manually for some pipeline sinks that return a PDone instead of any > metadata on what the sink did. This should be a separate thread on the dev > list. > Agree that currently there is no runner/filesystem-independent way to enforce cleanup. Solutions that enforce a certain structure to the pipeline graph or cleaning-up after a blocking pipeline execution does not work for this case since we are talking about enforcing cleanup after a read transform. I agree that best solution currently might be to enforce some expiration policy on temporary files. Regarding your comment "but reading more code made me realize that I should have used side inputs for that instead", I agree that using a side-input better here since otherwise you are creating a copy of the column schema for every record of the menifest file. Thanks, Cham > > 3. For the point about keys one of the ways to have them available to each > worker would be to create a package similar to the julia-set example and > then in the setup.py of that package pull data from KMS and then > instantiate appropriate environment variables that are needed for access to > AWS. This is just one way to do this and others might have better ideas. > > -Sourabh > > On Mon, Jun 12, 2017 at 5:38 PM Dmitry Demeshchuk <[email protected]> > wrote: > >> I really like the idea of a general S3 FileSystem first, sounds like it >> may also make the dynamic rebalancing work way easier. >> >> As for a generic PostgresIO source – yes, that's also in plans, because >> we rely on regular Postgres as well. In Redshift, using UNLOAD on large >> datasets is still the recommended approach, and we'd be using Redshift more >> extensively, so I decided to start with this source. I think these two >> sources will be complimentary to each other. >> >> As for the AWS keys – that's actually another question I wanted to ask: >> what's the recommended approach to storing and passing around the secrets >> like access tokens, passwords and such? The plan I originally had was to >> instead make the Dataflow nodes make the appropriate API calls to Google >> Cloud Storage and Google KMS. It feels, however, that it's generally safer >> and easier to reuse when secrets are defined at object configuration time >> and then just passed in serialized fashion to Dataflow. >> >> On Mon, Jun 12, 2017 at 5:23 PM, Sourabh Bajaj <[email protected]> >> wrote: >> >>> Couple of more things to add on what Cham mentioned in his email. >>> >>> 1. We currently don't have an implementation for the S3 FileSystem so >>> adding that might be a good starting point before you implement the unload >>> load patterns as you won't be able to use the pipeline's temporary >>> directory as an S3 path without that. >>> >>> 2. Another option based on the size of the Redshift tables might be to >>> directly use a psycopg2 to read the data instead of doing the unload. This >>> can be a generic PostgresIO in python. >>> >>> On a side node: This is going to pass the users AWS keys on the wire to >>> whatever runner. Might be good to make that optional and see if the machine >>> IAM role can be used as well as adding a note to warn about this so that >>> people can create keys with minimal access. >>> >>> >>> >>> On Mon, Jun 12, 2017 at 5:06 PM Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> 1: Using BoundedSource is not an antipattern per se. It is >>>> *recommended* in case you are able to use the capabilities that it provides >>>> over a ParDo - otherwise, it's recommended to use ParDo: see >>>> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api >>>> . >>>> >>>> 3: assume that, when you apply two ParDo's in a row to a collection, >>>> they will typically be fused. Fusion (in Dataflow) is very aggressive and >>>> will usually fuse things unless it is explicitly impossible (e.g. it's >>>> impossible to fuse across a GroupByKey, and it also won't fuse across a >>>> PCollection that gets passed to somebody as a side input). >>>> >>>> On Mon, Jun 12, 2017 at 5:02 PM Dmitry Demeshchuk <[email protected]> >>>> wrote: >>>> >>>>> Hi Cham, >>>>> >>>>> Thanks a lot for the clarifications! >>>>> >>>>> (1) I wouldn't mind to use BoundedSource, it's just that my impression >>>>> was that it was considered to be an anti-pattern. Seems like most of the >>>>> logic will be left intact though, so shouldn't really be a problem. Is >>>>> BoundedSource API going to stay mostly the same after the introduction of >>>>> SplittableDoFn? >>>>> >>>>> (2) Makes sense, thanks! >>>>> >>>>> (3) Thanks for the tip! This makes me wonder if some sort of test-time >>>>> fusion detector would be useful, which would take a pipeline, analyze it >>>>> and print out which transformations may be fused by the runner. But maybe >>>>> it's just easier to remember that ParDo steps tend to fuse. >>>>> >>>>> On a side note, as you may see, I've been putting the columns data >>>>> into the PCollection (for example, here: >>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72), >>>>> but reading more code made me realize that I should have used side inputs >>>>> for that instead? >>>>> >>>>> On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Dmitry, >>>>>> >>>>>> Thanks for writing this. Some general comments. >>>>>> >>>>>> (1) Do you want to implement this using ParDos or using BoundedSource >>>>>> [1] API. Using BoundedSource API has some benefits such as support for >>>>>> dynamic work rebalancing (see [2]) though using ParDos will be more >>>>>> future-proof (dynamic work rebalancing will be supported sometime in the >>>>>> future through SplittableDoFn API [3][4]). >>>>>> >>>>>> (2) Seems like what Java BigQuery source is doing is, deleting the >>>>>> temporary table at the location you mentioned, and deleting temporary >>>>>> exported files by mapping the directory path to pipeline's temporary path >>>>>> (which hopefully gets deleted by the runner). You should be able to >>>>>> utilize >>>>>> a similar approach in Python SDK. You should not delete exported files >>>>>> in a >>>>>> Pardo since a runner might rerun stages of a pipeline. >>>>>> >>>>>> (3) If you are using ParDo based approach, you should add a >>>>>> GroupByKey between the ParDos 'GetS3Files' and 'LoadDataFromS3'. >>>>>> Otherwise >>>>>> all of you ParDos might get fused to a single stage and you might end up >>>>>> reading all the data from a single worker. >>>>>> >>>>>> Thanks, >>>>>> Cham. >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L73 >>>>>> [2] >>>>>> https://beam.apache.org/documentation/io/authoring-overview/#read-transforms >>>>>> [3] https://s.apache.org/splittable-do-fn >>>>>> [4] >>>>>> https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit# >>>>>> >>>>>> >>>>>> On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi, list, >>>>>>> >>>>>>> I was hoping someone could give me a general code review on a >>>>>>> Redshift source I wrote: >>>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1. >>>>>>> It also relies on modules `s3` and `config` from our internal library, I >>>>>>> can add them too if needed, it just was more hassle to open up the >>>>>>> entire >>>>>>> repository with the code, since it contains some company-specific code >>>>>>> at >>>>>>> the moment. >>>>>>> >>>>>>> My hope was also to find out if you wanted me to file a pull >>>>>>> request, we'd be totally fine to open source this piece, as well as some >>>>>>> other AWS sources and sinks in the future. >>>>>>> >>>>>>> Finally, I have a specific question about cleanup. My impression was >>>>>>> that >>>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L153 >>>>>>> would help making sure that there's no possible data loss after we >>>>>>> delete >>>>>>> the S3 files, however, in a personal conversation Eugene Kirpichev >>>>>>> pointed >>>>>>> out that this way does not ensure the PCollection persistence, and that >>>>>>> Dataflow will just fuse multiple phases together. >>>>>>> >>>>>>> Also, Eugene pointed out that this cleanup problem has been worked >>>>>>> around in the BigQuery source in Java SDK. To my understanding, it's >>>>>>> this >>>>>>> one: >>>>>>> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L100, >>>>>>> however I don't yet have enough knowledge about the parity between Java >>>>>>> and >>>>>>> Python SDKs to tell whether I can or cannot implement a Python source >>>>>>> in a >>>>>>> similar fashion (from what I remember, implementing sources is generally >>>>>>> frowned upon, as opposed to writing a DoFn instead). >>>>>>> >>>>>>> Any thoughts and suggestions would be highly appreciated. >>>>>>> >>>>>>> Thank you. >>>>>>> >>>>>>> -- >>>>>>> Best regards, >>>>>>> Dmitry Demeshchuk. >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best regards, >>>>> Dmitry Demeshchuk. >>>>> >>>> >> >> >> -- >> Best regards, >> Dmitry Demeshchuk. >> >
