@Eugene Kirpichov <[email protected]> that pattern would be a nice one
to add to the Apache Beam patterns page:

https://beam.apache.org/documentation/patterns/overview/

Raised a jira, would be nice first PR for anyone wanting to contribute to
Beam :D

https://issues.apache.org/jira/browse/BEAM-9222

On Thu, 30 Jan 2020 at 05:16, Eugene Kirpichov <[email protected]> wrote:

> Note that
> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
> , while focused on the JdbcIO connector, still explains how to read from a
> database in parallel - that solution would equally apply to your code,
> because it is not specific to JDBC.
> You'd need to decide on a primary key that supports efficient range scans,
> and create a pipeline like: (create key ranges) | reshuffle |
> (ReadSnowflake each range).
>
> On Wed, Jan 29, 2020 at 11:01 AM Alan Krumholz <[email protected]>
> wrote:
>
>> Hi Brice,
>>
>> Thanks so much for your suggestion! I did the following and it seems to
>> work:
>>
>> class ReadSnowflake(beam.DoFn):
>>     def process(self, _):
>>         import snowflake.connector
>>         ctx = snowflake.connector.connect(...)
>>         cs = ctx.cursor()
>>         cs.execute('SELECT a, b FROM t')
>>         while True:
>>             data =  cs.fetchmany(1000)
>>             if len(data) == 0:
>>                 break;
>>             for d in data:
>>                 yield {'a':d[0], 'b':d[1]}
>>
>> pipeline = beam.Pipeline()
>> p = (
>>     pipeline
>>     | 'Empty start' >> beam.Create([''])
>>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>>
>>
>> I know this is not optimal as it is reading sequentially from snowflake
>> (instead of doing it in parallel as I'm sure the BQ source does) but apart
>> from that, do you see any other problems (or possible improvements) with
>> this code?
>>
>>
>> Thank you so much!
>>
>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]>
>> wrote:
>>
>>> I am using a pattern which I saw online (but can't seem to locate) which
>>> performs i/o in a DoFn using a standard python client. I use this to read
>>> and write to Google cloud storage in streaming mode. You could use this
>>> idea to perform almost any i/o. Depending on your use case and workflow,
>>> this may be an approach you could consider. Shout if you need some
>>> boilerplate.
>>>
>>> It does look like native support is coming and you know it is true as I
>>> read it on the internet.    :)
>>>
>>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>>
>>>
>>> You could also setup an external service or endpoint to perform the
>>> query and read the results into your pipeline in a pipeline step similar to
>>> the enrichment idea here:
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> And you could always write your own connector. Not a task to be taken
>>> too lightly but it can be done.
>>>
>>> HTH
>>>
>>>
>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <
>>> [email protected]> wrote:
>>>
>>>> Thanks for sharing this Erik!
>>>>
>>>> It would be really nice/convenient to have a python option to do
>>>> something like that. Our ML team is mostly a python shop and we are also
>>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>>>> their python sdk to author these).
>>>>
>>>> Please let me know if you can think of any way we could do this with
>>>> python.
>>>>
>>>> Thanks so much!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>>>> [email protected]> wrote:
>>>>
>>>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage
>>>>> in general:
>>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>>>
>>>>>
>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>> We are using beam and (dataflow) at my company and would like to use
>>>>>> it to read and write data from snowflake.
>>>>>>
>>>>>> Does anybody know if there are any source/sink available for
>>>>>> snowflake?
>>>>>>
>>>>>> if not, what would be the easiest way to create those? (maybe there
>>>>>> is something for sqlalchemy that we could leverage for that?)
>>>>>>
>>>>>>
>>>>>> Thanks so much!
>>>>>>
>>>>>> Alan
>>>>>>
>>>>>

Reply via email to