Note that
https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
, while focused on the JdbcIO connector, still explains how to read from a
database in parallel - that solution would equally apply to your code,
because it is not specific to JDBC.
You'd need to decide on a primary key that supports efficient range scans,
and create a pipeline like: (create key ranges) | reshuffle |
(ReadSnowflake each range).

On Wed, Jan 29, 2020 at 11:01 AM Alan Krumholz <[email protected]>
wrote:

> Hi Brice,
>
> Thanks so much for your suggestion! I did the following and it seems to
> work:
>
> class ReadSnowflake(beam.DoFn):
>     def process(self, _):
>         import snowflake.connector
>         ctx = snowflake.connector.connect(...)
>         cs = ctx.cursor()
>         cs.execute('SELECT a, b FROM t')
>         while True:
>             data =  cs.fetchmany(1000)
>             if len(data) == 0:
>                 break;
>             for d in data:
>                 yield {'a':d[0], 'b':d[1]}
>
> pipeline = beam.Pipeline()
> p = (
>     pipeline
>     | 'Empty start' >> beam.Create([''])
>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>
>
> I know this is not optimal as it is reading sequentially from snowflake
> (instead of doing it in parallel as I'm sure the BQ source does) but apart
> from that, do you see any other problems (or possible improvements) with
> this code?
>
>
> Thank you so much!
>
> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]> wrote:
>
>> I am using a pattern which I saw online (but can't seem to locate) which
>> performs i/o in a DoFn using a standard python client. I use this to read
>> and write to Google cloud storage in streaming mode. You could use this
>> idea to perform almost any i/o. Depending on your use case and workflow,
>> this may be an approach you could consider. Shout if you need some
>> boilerplate.
>>
>> It does look like native support is coming and you know it is true as I
>> read it on the internet.    :)
>>
>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>
>>
>> You could also setup an external service or endpoint to perform the query
>> and read the results into your pipeline in a pipeline step similar to the
>> enrichment idea here:
>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>
>> And you could always write your own connector. Not a task to be taken too
>> lightly but it can be done.
>>
>> HTH
>>
>>
>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <[email protected]>
>> wrote:
>>
>>> Thanks for sharing this Erik!
>>>
>>> It would be really nice/convenient to have a python option to do
>>> something like that. Our ML team is mostly a python shop and we are also
>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>>> their python sdk to author these).
>>>
>>> Please let me know if you can think of any way we could do this with
>>> python.
>>>
>>> Thanks so much!
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>>> [email protected]> wrote:
>>>
>>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage
>>>> in general:
>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>>
>>>>
>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi,
>>>>> We are using beam and (dataflow) at my company and would like to use
>>>>> it to read and write data from snowflake.
>>>>>
>>>>> Does anybody know if there are any source/sink available for snowflake?
>>>>>
>>>>> if not, what would be the easiest way to create those? (maybe there is
>>>>> something for sqlalchemy that we could leverage for that?)
>>>>>
>>>>>
>>>>> Thanks so much!
>>>>>
>>>>> Alan
>>>>>
>>>>

Reply via email to