Hi Robert, beam.util.GroupIntoBatches(x) works perfectly. Thanks so much! On Wed, Jan 29, 2020 at 12:58 PM Robert Bradshaw <[email protected]> wrote:
> You could use the beam.util.BatchElements transform to batch rows into > larger chunks. > > On Wed, Jan 29, 2020 at 12:01 PM Alan Krumholz > <[email protected]> wrote: > > > > Thanks Brice! I'll look into wrapping the connector. > > > > One more question. > > I'm trying now to develop a sink too. This is what I have: > > > > def writeSnowflake(row): > > import snowflake.connector > > ctx = snowflake.connector.connect(...) > > cs = ctx.cursor() > > cs.execute( > > 'INSERT INTO t(a, b) VALUES ({a}, {b})'.format( > > a = str(row['a']), > > b = str(row['b']) > > ) > > ) > > return row > > > > pipeline = beam.Pipeline(...) > > p = ( > > pipeline > > | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...)) > > | 'Write to SnowFlake' >> beam.Map(writeSnowflake) > > ) > > > > This seems to (slowly) work but it feels extremely inefficient to send > an INSERT query to the DW for each row in the dataset. > > Is there an easy way to have the pipeline maybe stack all my data rows > into chunks of 1,000 so I can insert these by chunks instead. I'm mostly > curious about how to have the pipeline pass 1K rows at a time to > "writeSnowflake()" instead of passing one by one. > > Maybe by using a GroupByKey transformation and using randomly sampled > keys to create the chunks of the desired size? (or can you think of a > better way to achieve this?) > > > > Thank you so much for all your help! > > > > On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht <[email protected]> > wrote: > >> > >> Lovely! You absolutely have the right idea. > >> > >> Your comments are spot on but the suboptimal solution that works is > usually preferable to the optimal one that doesn't (exist). > >> > >> I don't have any experience with the Snowflake connector but if it can > be kept around and reused and is slow or expensive to create, you may want > to consider a wrapper/class to manage the client/connector that you can use > across your pipeline steps if needed. > >> > >> Happy processing. > >> -Brice > >> > >> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz < > [email protected]> wrote: > >>> > >>> Hi Brice, > >>> > >>> Thanks so much for your suggestion! I did the following and it seems > to work: > >>> > >>> class ReadSnowflake(beam.DoFn): > >>> def process(self, _): > >>> import snowflake.connector > >>> ctx = snowflake.connector.connect(...) > >>> cs = ctx.cursor() > >>> cs.execute('SELECT a, b FROM t') > >>> while True: > >>> data = cs.fetchmany(1000) > >>> if len(data) == 0: > >>> break; > >>> for d in data: > >>> yield {'a':d[0], 'b':d[1]} > >>> > >>> pipeline = beam.Pipeline() > >>> p = ( > >>> pipeline > >>> | 'Empty start' >> beam.Create(['']) > >>> | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake()) > >>> | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...)) > >>> > >>> > >>> I know this is not optimal as it is reading sequentially from > snowflake (instead of doing it in parallel as I'm sure the BQ source does) > but apart from that, do you see any other problems (or possible > improvements) with this code? > >>> > >>> > >>> Thank you so much! > >>> > >>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]> > wrote: > >>>> > >>>> I am using a pattern which I saw online (but can't seem to locate) > which performs i/o in a DoFn using a standard python client. I use this to > read and write to Google cloud storage in streaming mode. You could use > this idea to perform almost any i/o. Depending on your use case and > workflow, this may be an approach you could consider. Shout if you need > some boilerplate. > >>>> > >>>> It does look like native support is coming and you know it is true as > I read it on the internet. :) > >>>> > https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/ > >>>> > >>>> You could also setup an external service or endpoint to perform the > query and read the results into your pipeline in a pipeline step similar to > the enrichment idea here: > https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 > >>>> > >>>> And you could always write your own connector. Not a task to be taken > too lightly but it can be done. > >>>> > >>>> HTH > >>>> > >>>> > >>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz < > [email protected]> wrote: > >>>>> > >>>>> Thanks for sharing this Erik! > >>>>> > >>>>> It would be really nice/convenient to have a python option to do > something like that. Our ML team is mostly a python shop and we are also > using kubeflow pipelines to orchestrate our ML pipelines (mostly using > their python sdk to author these). > >>>>> > >>>>> Please let me know if you can think of any way we could do this with > python. > >>>>> > >>>>> Thanks so much! > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey < > [email protected]> wrote: > >>>>>> > >>>>>> You can use the JDBC driver. Here's a blog that describes JDBC > usage in general: > https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/ > >>>>>> > >>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz < > [email protected]> wrote: > >>>>>>> > >>>>>>> Hi, > >>>>>>> We are using beam and (dataflow) at my company and would like to > use it to read and write data from snowflake. > >>>>>>> > >>>>>>> Does anybody know if there are any source/sink available for > snowflake? > >>>>>>> > >>>>>>> if not, what would be the easiest way to create those? (maybe > there is something for sqlalchemy that we could leverage for that?) > >>>>>>> > >>>>>>> > >>>>>>> Thanks so much! > >>>>>>> > >>>>>>> Alan >
