Hi Robert, beam.util.GroupIntoBatches(x) works perfectly. Thanks so much!

On Wed, Jan 29, 2020 at 12:58 PM Robert Bradshaw <[email protected]>
wrote:

> You could use the beam.util.BatchElements transform to batch rows into
> larger chunks.
>
> On Wed, Jan 29, 2020 at 12:01 PM Alan Krumholz
> <[email protected]> wrote:
> >
> > Thanks Brice! I'll look into wrapping the connector.
> >
> > One more question.
> > I'm trying now to develop a sink too. This is what I have:
> >
> > def writeSnowflake(row):
> >     import snowflake.connector
> >     ctx = snowflake.connector.connect(...)
> >     cs = ctx.cursor()
> >     cs.execute(
> >         'INSERT INTO t(a, b) VALUES ({a}, {b})'.format(
> >             a = str(row['a']),
> >             b = str(row['b'])
> >         )
> >     )
> >     return row
> >
> > pipeline = beam.Pipeline(...)
> > p = (
> >     pipeline
> >     | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...))
> >     | 'Write to SnowFlake' >> beam.Map(writeSnowflake)
> > )
> >
> > This seems to (slowly) work but it feels extremely inefficient to send
> an INSERT query to the DW for each row in the dataset.
> > Is there an easy way to have the pipeline maybe stack all my data rows
> into chunks of 1,000 so I can insert these by chunks instead. I'm mostly
> curious about how to have the pipeline pass 1K rows at a time to
> "writeSnowflake()" instead of passing one by one.
> > Maybe by using a GroupByKey transformation and using randomly sampled
> keys to create the chunks of the desired size? (or can you think of a
> better way to achieve this?)
> >
> > Thank you so much for all your help!
> >
> > On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht <[email protected]>
> wrote:
> >>
> >> Lovely! You absolutely have the right idea.
> >>
> >> Your comments are spot on but the suboptimal solution that works is
> usually preferable to the optimal one that doesn't (exist).
> >>
> >> I don't have any experience with the Snowflake connector but if it can
> be kept around and reused and is slow or expensive to create, you may want
> to consider a wrapper/class to manage the client/connector that you can use
> across your pipeline steps if needed.
> >>
> >> Happy processing.
> >> -Brice
> >>
> >> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <
> [email protected]> wrote:
> >>>
> >>> Hi Brice,
> >>>
> >>> Thanks so much for your suggestion! I did the following and it seems
> to work:
> >>>
> >>> class ReadSnowflake(beam.DoFn):
> >>>     def process(self, _):
> >>>         import snowflake.connector
> >>>         ctx = snowflake.connector.connect(...)
> >>>         cs = ctx.cursor()
> >>>         cs.execute('SELECT a, b FROM t')
> >>>         while True:
> >>>             data =  cs.fetchmany(1000)
> >>>             if len(data) == 0:
> >>>                 break;
> >>>             for d in data:
> >>>                 yield {'a':d[0], 'b':d[1]}
> >>>
> >>> pipeline = beam.Pipeline()
> >>> p = (
> >>>     pipeline
> >>>     | 'Empty start' >> beam.Create([''])
> >>>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
> >>>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
> >>>
> >>>
> >>> I know this is not optimal as it is reading sequentially from
> snowflake (instead of doing it in parallel as I'm sure the BQ source does)
> but apart from that, do you see any other problems (or possible
> improvements) with this code?
> >>>
> >>>
> >>> Thank you so much!
> >>>
> >>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]>
> wrote:
> >>>>
> >>>> I am using a pattern which I saw online (but can't seem to locate)
> which performs i/o in a DoFn using a standard python client. I use this to
> read and write to Google cloud storage in streaming mode. You could use
> this idea to perform almost any i/o. Depending on your use case and
> workflow, this may be an approach you could consider. Shout if you need
> some boilerplate.
> >>>>
> >>>> It does look like native support is coming and you know it is true as
> I read it on the internet.    :)
> >>>>
> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
> >>>>
> >>>> You could also setup an external service or endpoint to perform the
> query and read the results into your pipeline in a pipeline step similar to
> the enrichment idea here:
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
> >>>>
> >>>> And you could always write your own connector. Not a task to be taken
> too lightly but it can be done.
> >>>>
> >>>> HTH
> >>>>
> >>>>
> >>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <
> [email protected]> wrote:
> >>>>>
> >>>>> Thanks for sharing this Erik!
> >>>>>
> >>>>> It would be really nice/convenient to have a python option to do
> something like that. Our ML team is mostly a python shop and we are also
> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
> their python sdk to author these).
> >>>>>
> >>>>> Please let me know if you can think of any way we could do this with
> python.
> >>>>>
> >>>>> Thanks so much!
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
> [email protected]> wrote:
> >>>>>>
> >>>>>> You can use the JDBC driver.  Here's a blog that describes JDBC
> usage in general:
> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
> >>>>>>
> >>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
> [email protected]> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>> We are using beam and (dataflow) at my company and would like to
> use it to read and write data from snowflake.
> >>>>>>>
> >>>>>>> Does anybody know if there are any source/sink available for
> snowflake?
> >>>>>>>
> >>>>>>> if not, what would be the easiest way to create those? (maybe
> there is something for sqlalchemy that we could leverage for that?)
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks so much!
> >>>>>>>
> >>>>>>> Alan
>

Reply via email to