1: Using BoundedSource is not an antipattern per se. It is *recommended* in case you are able to use the capabilities that it provides over a ParDo - otherwise, it's recommended to use ParDo: see https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api .
3: assume that, when you apply two ParDo's in a row to a collection, they will typically be fused. Fusion (in Dataflow) is very aggressive and will usually fuse things unless it is explicitly impossible (e.g. it's impossible to fuse across a GroupByKey, and it also won't fuse across a PCollection that gets passed to somebody as a side input). On Mon, Jun 12, 2017 at 5:02 PM Dmitry Demeshchuk <[email protected]> wrote: > Hi Cham, > > Thanks a lot for the clarifications! > > (1) I wouldn't mind to use BoundedSource, it's just that my impression was > that it was considered to be an anti-pattern. Seems like most of the logic > will be left intact though, so shouldn't really be a problem. Is > BoundedSource API going to stay mostly the same after the introduction of > SplittableDoFn? > > (2) Makes sense, thanks! > > (3) Thanks for the tip! This makes me wonder if some sort of test-time > fusion detector would be useful, which would take a pipeline, analyze it > and print out which transformations may be fused by the runner. But maybe > it's just easier to remember that ParDo steps tend to fuse. > > On a side note, as you may see, I've been putting the columns data into > the PCollection (for example, here: > https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72), > but reading more code made me realize that I should have used side inputs > for that instead? > > On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath <[email protected]> > wrote: > >> Hi Dmitry, >> >> Thanks for writing this. Some general comments. >> >> (1) Do you want to implement this using ParDos or using BoundedSource [1] >> API. Using BoundedSource API has some benefits such as support for dynamic >> work rebalancing (see [2]) though using ParDos will be more future-proof >> (dynamic work rebalancing will be supported sometime in the future through >> SplittableDoFn API [3][4]). >> >> (2) Seems like what Java BigQuery source is doing is, deleting the >> temporary table at the location you mentioned, and deleting temporary >> exported files by mapping the directory path to pipeline's temporary path >> (which hopefully gets deleted by the runner). You should be able to utilize >> a similar approach in Python SDK. You should not delete exported files in a >> Pardo since a runner might rerun stages of a pipeline. >> >> (3) If you are using ParDo based approach, you should add a GroupByKey >> between the ParDos 'GetS3Files' and 'LoadDataFromS3'. Otherwise all of you >> ParDos might get fused to a single stage and you might end up reading all >> the data from a single worker. >> >> Thanks, >> Cham. >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L73 >> [2] >> https://beam.apache.org/documentation/io/authoring-overview/#read-transforms >> [3] https://s.apache.org/splittable-do-fn >> [4] >> https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit# >> >> >> On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <[email protected]> >> wrote: >> >>> Hi, list, >>> >>> I was hoping someone could give me a general code review on a Redshift >>> source I wrote: >>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1. It >>> also relies on modules `s3` and `config` from our internal library, I can >>> add them too if needed, it just was more hassle to open up the entire >>> repository with the code, since it contains some company-specific code at >>> the moment. >>> >>> My hope was also to find out if you wanted me to file a pull request, >>> we'd be totally fine to open source this piece, as well as some other AWS >>> sources and sinks in the future. >>> >>> Finally, I have a specific question about cleanup. My impression was >>> that >>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L153 >>> would help making sure that there's no possible data loss after we delete >>> the S3 files, however, in a personal conversation Eugene Kirpichev pointed >>> out that this way does not ensure the PCollection persistence, and that >>> Dataflow will just fuse multiple phases together. >>> >>> Also, Eugene pointed out that this cleanup problem has been worked >>> around in the BigQuery source in Java SDK. To my understanding, it's this >>> one: >>> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L100, >>> however I don't yet have enough knowledge about the parity between Java and >>> Python SDKs to tell whether I can or cannot implement a Python source in a >>> similar fashion (from what I remember, implementing sources is generally >>> frowned upon, as opposed to writing a DoFn instead). >>> >>> Any thoughts and suggestions would be highly appreciated. >>> >>> Thank you. >>> >>> -- >>> Best regards, >>> Dmitry Demeshchuk. >>> >> > > > -- > Best regards, > Dmitry Demeshchuk. >
