Thanks Brice! I'll look into wrapping the connector.

One more question.
I'm trying now to develop a sink too. This is what I have:

def writeSnowflake(row):
    import snowflake.connector
    ctx = snowflake.connector.connect(...)
    cs = ctx.cursor()
    cs.execute(
        'INSERT INTO t(a, b) VALUES ({a}, {b})'.format(
            a = str(row['a']),
            b = str(row['b'])
        )
    )
    return row

pipeline = beam.Pipeline(...)
p = (
    pipeline
    | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...))
    | 'Write to SnowFlake' >> beam.Map(writeSnowflake)
)

This seems to (slowly) work but it feels extremely inefficient to send an
INSERT query to the DW for each row in the dataset.
Is there an easy way to have the pipeline maybe stack all my data rows into
chunks of 1,000 so I can insert these by chunks instead. I'm mostly curious
about how to have the pipeline pass 1K rows at a time to "writeSnowflake()"
instead of passing one by one.
Maybe by using a GroupByKey transformation and using randomly sampled keys
to create the chunks of the desired size? (or can you think of a better way
to achieve this?)

Thank you so much for all your help!

On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht <[email protected]> wrote:

> Lovely! You absolutely have the right idea.
>
> Your comments are spot on but the suboptimal solution that works is
> usually preferable to the optimal one that doesn't (exist).
>
> I don't have any experience with the Snowflake connector but if it can be
> kept around and reused and is slow or expensive to create, you may want to
> consider a wrapper/class to manage the client/connector that you can use
> across your pipeline steps if needed.
>
> Happy processing.
> -Brice
>
> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <[email protected]>
> wrote:
>
>> Hi Brice,
>>
>> Thanks so much for your suggestion! I did the following and it seems to
>> work:
>>
>> class ReadSnowflake(beam.DoFn):
>>     def process(self, _):
>>         import snowflake.connector
>>         ctx = snowflake.connector.connect(...)
>>         cs = ctx.cursor()
>>         cs.execute('SELECT a, b FROM t')
>>         while True:
>>             data =  cs.fetchmany(1000)
>>             if len(data) == 0:
>>                 break;
>>             for d in data:
>>                 yield {'a':d[0], 'b':d[1]}
>>
>> pipeline = beam.Pipeline()
>> p = (
>>     pipeline
>>     | 'Empty start' >> beam.Create([''])
>>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>>
>>
>> I know this is not optimal as it is reading sequentially from snowflake
>> (instead of doing it in parallel as I'm sure the BQ source does) but apart
>> from that, do you see any other problems (or possible improvements) with
>> this code?
>>
>>
>> Thank you so much!
>>
>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]>
>> wrote:
>>
>>> I am using a pattern which I saw online (but can't seem to locate) which
>>> performs i/o in a DoFn using a standard python client. I use this to read
>>> and write to Google cloud storage in streaming mode. You could use this
>>> idea to perform almost any i/o. Depending on your use case and workflow,
>>> this may be an approach you could consider. Shout if you need some
>>> boilerplate.
>>>
>>> It does look like native support is coming and you know it is true as I
>>> read it on the internet.    :)
>>>
>>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>>
>>>
>>> You could also setup an external service or endpoint to perform the
>>> query and read the results into your pipeline in a pipeline step similar to
>>> the enrichment idea here:
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> And you could always write your own connector. Not a task to be taken
>>> too lightly but it can be done.
>>>
>>> HTH
>>>
>>>
>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <
>>> [email protected]> wrote:
>>>
>>>> Thanks for sharing this Erik!
>>>>
>>>> It would be really nice/convenient to have a python option to do
>>>> something like that. Our ML team is mostly a python shop and we are also
>>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>>>> their python sdk to author these).
>>>>
>>>> Please let me know if you can think of any way we could do this with
>>>> python.
>>>>
>>>> Thanks so much!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>>>> [email protected]> wrote:
>>>>
>>>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage
>>>>> in general:
>>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>>>
>>>>>
>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>> We are using beam and (dataflow) at my company and would like to use
>>>>>> it to read and write data from snowflake.
>>>>>>
>>>>>> Does anybody know if there are any source/sink available for
>>>>>> snowflake?
>>>>>>
>>>>>> if not, what would be the easiest way to create those? (maybe there
>>>>>> is something for sqlalchemy that we could leverage for that?)
>>>>>>
>>>>>>
>>>>>> Thanks so much!
>>>>>>
>>>>>> Alan
>>>>>>
>>>>>

Reply via email to