You could use the beam.util.BatchElements transform to batch rows into larger chunks.
On Wed, Jan 29, 2020 at 12:01 PM Alan Krumholz <[email protected]> wrote: > > Thanks Brice! I'll look into wrapping the connector. > > One more question. > I'm trying now to develop a sink too. This is what I have: > > def writeSnowflake(row): > import snowflake.connector > ctx = snowflake.connector.connect(...) > cs = ctx.cursor() > cs.execute( > 'INSERT INTO t(a, b) VALUES ({a}, {b})'.format( > a = str(row['a']), > b = str(row['b']) > ) > ) > return row > > pipeline = beam.Pipeline(...) > p = ( > pipeline > | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...)) > | 'Write to SnowFlake' >> beam.Map(writeSnowflake) > ) > > This seems to (slowly) work but it feels extremely inefficient to send an > INSERT query to the DW for each row in the dataset. > Is there an easy way to have the pipeline maybe stack all my data rows into > chunks of 1,000 so I can insert these by chunks instead. I'm mostly curious > about how to have the pipeline pass 1K rows at a time to "writeSnowflake()" > instead of passing one by one. > Maybe by using a GroupByKey transformation and using randomly sampled keys to > create the chunks of the desired size? (or can you think of a better way to > achieve this?) > > Thank you so much for all your help! > > On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht <[email protected]> wrote: >> >> Lovely! You absolutely have the right idea. >> >> Your comments are spot on but the suboptimal solution that works is usually >> preferable to the optimal one that doesn't (exist). >> >> I don't have any experience with the Snowflake connector but if it can be >> kept around and reused and is slow or expensive to create, you may want to >> consider a wrapper/class to manage the client/connector that you can use >> across your pipeline steps if needed. >> >> Happy processing. >> -Brice >> >> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <[email protected]> >> wrote: >>> >>> Hi Brice, >>> >>> Thanks so much for your suggestion! I did the following and it seems to >>> work: >>> >>> class ReadSnowflake(beam.DoFn): >>> def process(self, _): >>> import snowflake.connector >>> ctx = snowflake.connector.connect(...) >>> cs = ctx.cursor() >>> cs.execute('SELECT a, b FROM t') >>> while True: >>> data = cs.fetchmany(1000) >>> if len(data) == 0: >>> break; >>> for d in data: >>> yield {'a':d[0], 'b':d[1]} >>> >>> pipeline = beam.Pipeline() >>> p = ( >>> pipeline >>> | 'Empty start' >> beam.Create(['']) >>> | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake()) >>> | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...)) >>> >>> >>> I know this is not optimal as it is reading sequentially from snowflake >>> (instead of doing it in parallel as I'm sure the BQ source does) but apart >>> from that, do you see any other problems (or possible improvements) with >>> this code? >>> >>> >>> Thank you so much! >>> >>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]> wrote: >>>> >>>> I am using a pattern which I saw online (but can't seem to locate) which >>>> performs i/o in a DoFn using a standard python client. I use this to read >>>> and write to Google cloud storage in streaming mode. You could use this >>>> idea to perform almost any i/o. Depending on your use case and workflow, >>>> this may be an approach you could consider. Shout if you need some >>>> boilerplate. >>>> >>>> It does look like native support is coming and you know it is true as I >>>> read it on the internet. :) >>>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/ >>>> >>>> You could also setup an external service or endpoint to perform the query >>>> and read the results into your pipeline in a pipeline step similar to the >>>> enrichment idea here: >>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 >>>> >>>> And you could always write your own connector. Not a task to be taken too >>>> lightly but it can be done. >>>> >>>> HTH >>>> >>>> >>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <[email protected]> >>>> wrote: >>>>> >>>>> Thanks for sharing this Erik! >>>>> >>>>> It would be really nice/convenient to have a python option to do >>>>> something like that. Our ML team is mostly a python shop and we are also >>>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using >>>>> their python sdk to author these). >>>>> >>>>> Please let me know if you can think of any way we could do this with >>>>> python. >>>>> >>>>> Thanks so much! >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey >>>>> <[email protected]> wrote: >>>>>> >>>>>> You can use the JDBC driver. Here's a blog that describes JDBC usage in >>>>>> general: >>>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/ >>>>>> >>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz >>>>>> <[email protected]> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> We are using beam and (dataflow) at my company and would like to use it >>>>>>> to read and write data from snowflake. >>>>>>> >>>>>>> Does anybody know if there are any source/sink available for snowflake? >>>>>>> >>>>>>> if not, what would be the easiest way to create those? (maybe there is >>>>>>> something for sqlalchemy that we could leverage for that?) >>>>>>> >>>>>>> >>>>>>> Thanks so much! >>>>>>> >>>>>>> Alan
