@Eugene Kirpichov <[email protected]> that pattern would be a nice one to add to the Apache Beam patterns page:
https://beam.apache.org/documentation/patterns/overview/ Raised a jira, would be nice first PR for anyone wanting to contribute to Beam :D https://issues.apache.org/jira/browse/BEAM-9222 On Thu, 30 Jan 2020 at 05:16, Eugene Kirpichov <[email protected]> wrote: > Note that > https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/ > , while focused on the JdbcIO connector, still explains how to read from a > database in parallel - that solution would equally apply to your code, > because it is not specific to JDBC. > You'd need to decide on a primary key that supports efficient range scans, > and create a pipeline like: (create key ranges) | reshuffle | > (ReadSnowflake each range). > > On Wed, Jan 29, 2020 at 11:01 AM Alan Krumholz <[email protected]> > wrote: > >> Hi Brice, >> >> Thanks so much for your suggestion! I did the following and it seems to >> work: >> >> class ReadSnowflake(beam.DoFn): >> def process(self, _): >> import snowflake.connector >> ctx = snowflake.connector.connect(...) >> cs = ctx.cursor() >> cs.execute('SELECT a, b FROM t') >> while True: >> data = cs.fetchmany(1000) >> if len(data) == 0: >> break; >> for d in data: >> yield {'a':d[0], 'b':d[1]} >> >> pipeline = beam.Pipeline() >> p = ( >> pipeline >> | 'Empty start' >> beam.Create(['']) >> | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake()) >> | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...)) >> >> >> I know this is not optimal as it is reading sequentially from snowflake >> (instead of doing it in parallel as I'm sure the BQ source does) but apart >> from that, do you see any other problems (or possible improvements) with >> this code? >> >> >> Thank you so much! >> >> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]> >> wrote: >> >>> I am using a pattern which I saw online (but can't seem to locate) which >>> performs i/o in a DoFn using a standard python client. I use this to read >>> and write to Google cloud storage in streaming mode. You could use this >>> idea to perform almost any i/o. Depending on your use case and workflow, >>> this may be an approach you could consider. Shout if you need some >>> boilerplate. >>> >>> It does look like native support is coming and you know it is true as I >>> read it on the internet. :) >>> >>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/ >>> >>> >>> You could also setup an external service or endpoint to perform the >>> query and read the results into your pipeline in a pipeline step similar to >>> the enrichment idea here: >>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 >>> >>> And you could always write your own connector. Not a task to be taken >>> too lightly but it can be done. >>> >>> HTH >>> >>> >>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz < >>> [email protected]> wrote: >>> >>>> Thanks for sharing this Erik! >>>> >>>> It would be really nice/convenient to have a python option to do >>>> something like that. Our ML team is mostly a python shop and we are also >>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using >>>> their python sdk to author these). >>>> >>>> Please let me know if you can think of any way we could do this with >>>> python. >>>> >>>> Thanks so much! >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey < >>>> [email protected]> wrote: >>>> >>>>> You can use the JDBC driver. Here's a blog that describes JDBC usage >>>>> in general: >>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/ >>>>> >>>>> >>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi, >>>>>> We are using beam and (dataflow) at my company and would like to use >>>>>> it to read and write data from snowflake. >>>>>> >>>>>> Does anybody know if there are any source/sink available for >>>>>> snowflake? >>>>>> >>>>>> if not, what would be the easiest way to create those? (maybe there >>>>>> is something for sqlalchemy that we could leverage for that?) >>>>>> >>>>>> >>>>>> Thanks so much! >>>>>> >>>>>> Alan >>>>>> >>>>>
