Lovely! You absolutely have the right idea. Your comments are spot on but the suboptimal solution that works is usually preferable to the optimal one that doesn't (exist).
I don't have any experience with the Snowflake connector but if it can be kept around and reused and is slow or expensive to create, you may want to consider a wrapper/class to manage the client/connector that you can use across your pipeline steps if needed. Happy processing. -Brice On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <[email protected]> wrote: > Hi Brice, > > Thanks so much for your suggestion! I did the following and it seems to > work: > > class ReadSnowflake(beam.DoFn): > def process(self, _): > import snowflake.connector > ctx = snowflake.connector.connect(...) > cs = ctx.cursor() > cs.execute('SELECT a, b FROM t') > while True: > data = cs.fetchmany(1000) > if len(data) == 0: > break; > for d in data: > yield {'a':d[0], 'b':d[1]} > > pipeline = beam.Pipeline() > p = ( > pipeline > | 'Empty start' >> beam.Create(['']) > | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake()) > | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...)) > > > I know this is not optimal as it is reading sequentially from snowflake > (instead of doing it in parallel as I'm sure the BQ source does) but apart > from that, do you see any other problems (or possible improvements) with > this code? > > > Thank you so much! > > On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]> wrote: > >> I am using a pattern which I saw online (but can't seem to locate) which >> performs i/o in a DoFn using a standard python client. I use this to read >> and write to Google cloud storage in streaming mode. You could use this >> idea to perform almost any i/o. Depending on your use case and workflow, >> this may be an approach you could consider. Shout if you need some >> boilerplate. >> >> It does look like native support is coming and you know it is true as I >> read it on the internet. :) >> >> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/ >> >> >> You could also setup an external service or endpoint to perform the query >> and read the results into your pipeline in a pipeline step similar to the >> enrichment idea here: >> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 >> >> And you could always write your own connector. Not a task to be taken too >> lightly but it can be done. >> >> HTH >> >> >> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <[email protected]> >> wrote: >> >>> Thanks for sharing this Erik! >>> >>> It would be really nice/convenient to have a python option to do >>> something like that. Our ML team is mostly a python shop and we are also >>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using >>> their python sdk to author these). >>> >>> Please let me know if you can think of any way we could do this with >>> python. >>> >>> Thanks so much! >>> >>> >>> >>> >>> >>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey < >>> [email protected]> wrote: >>> >>>> You can use the JDBC driver. Here's a blog that describes JDBC usage >>>> in general: >>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/ >>>> >>>> >>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz < >>>> [email protected]> wrote: >>>> >>>>> Hi, >>>>> We are using beam and (dataflow) at my company and would like to use >>>>> it to read and write data from snowflake. >>>>> >>>>> Does anybody know if there are any source/sink available for snowflake? >>>>> >>>>> if not, what would be the easiest way to create those? (maybe there is >>>>> something for sqlalchemy that we could leverage for that?) >>>>> >>>>> >>>>> Thanks so much! >>>>> >>>>> Alan >>>>> >>>>
