You could use the beam.util.BatchElements transform to batch rows into
larger chunks.

On Wed, Jan 29, 2020 at 12:01 PM Alan Krumholz
<[email protected]> wrote:
>
> Thanks Brice! I'll look into wrapping the connector.
>
> One more question.
> I'm trying now to develop a sink too. This is what I have:
>
> def writeSnowflake(row):
>     import snowflake.connector
>     ctx = snowflake.connector.connect(...)
>     cs = ctx.cursor()
>     cs.execute(
>         'INSERT INTO t(a, b) VALUES ({a}, {b})'.format(
>             a = str(row['a']),
>             b = str(row['b'])
>         )
>     )
>     return row
>
> pipeline = beam.Pipeline(...)
> p = (
>     pipeline
>     | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...))
>     | 'Write to SnowFlake' >> beam.Map(writeSnowflake)
> )
>
> This seems to (slowly) work but it feels extremely inefficient to send an 
> INSERT query to the DW for each row in the dataset.
> Is there an easy way to have the pipeline maybe stack all my data rows into 
> chunks of 1,000 so I can insert these by chunks instead. I'm mostly curious 
> about how to have the pipeline pass 1K rows at a time to "writeSnowflake()" 
> instead of passing one by one.
> Maybe by using a GroupByKey transformation and using randomly sampled keys to 
> create the chunks of the desired size? (or can you think of a better way to 
> achieve this?)
>
> Thank you so much for all your help!
>
> On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht <[email protected]> wrote:
>>
>> Lovely! You absolutely have the right idea.
>>
>> Your comments are spot on but the suboptimal solution that works is usually 
>> preferable to the optimal one that doesn't (exist).
>>
>> I don't have any experience with the Snowflake connector but if it can be 
>> kept around and reused and is slow or expensive to create, you may want to 
>> consider a wrapper/class to manage the client/connector that you can use 
>> across your pipeline steps if needed.
>>
>> Happy processing.
>> -Brice
>>
>> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <[email protected]> 
>> wrote:
>>>
>>> Hi Brice,
>>>
>>> Thanks so much for your suggestion! I did the following and it seems to 
>>> work:
>>>
>>> class ReadSnowflake(beam.DoFn):
>>>     def process(self, _):
>>>         import snowflake.connector
>>>         ctx = snowflake.connector.connect(...)
>>>         cs = ctx.cursor()
>>>         cs.execute('SELECT a, b FROM t')
>>>         while True:
>>>             data =  cs.fetchmany(1000)
>>>             if len(data) == 0:
>>>                 break;
>>>             for d in data:
>>>                 yield {'a':d[0], 'b':d[1]}
>>>
>>> pipeline = beam.Pipeline()
>>> p = (
>>>     pipeline
>>>     | 'Empty start' >> beam.Create([''])
>>>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>>>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>>>
>>>
>>> I know this is not optimal as it is reading sequentially from snowflake 
>>> (instead of doing it in parallel as I'm sure the BQ source does) but apart 
>>> from that, do you see any other problems (or possible improvements) with 
>>> this code?
>>>
>>>
>>> Thank you so much!
>>>
>>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]> wrote:
>>>>
>>>> I am using a pattern which I saw online (but can't seem to locate) which 
>>>> performs i/o in a DoFn using a standard python client. I use this to read 
>>>> and write to Google cloud storage in streaming mode. You could use this 
>>>> idea to perform almost any i/o. Depending on your use case and workflow, 
>>>> this may be an approach you could consider. Shout if you need some 
>>>> boilerplate.
>>>>
>>>> It does look like native support is coming and you know it is true as I 
>>>> read it on the internet.    :)
>>>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>>>
>>>> You could also setup an external service or endpoint to perform the query 
>>>> and read the results into your pipeline in a pipeline step similar to the 
>>>> enrichment idea here: 
>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>
>>>> And you could always write your own connector. Not a task to be taken too 
>>>> lightly but it can be done.
>>>>
>>>> HTH
>>>>
>>>>
>>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <[email protected]> 
>>>> wrote:
>>>>>
>>>>> Thanks for sharing this Erik!
>>>>>
>>>>> It would be really nice/convenient to have a python option to do 
>>>>> something like that. Our ML team is mostly a python shop and we are also 
>>>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using 
>>>>> their python sdk to author these).
>>>>>
>>>>> Please let me know if you can think of any way we could do this with 
>>>>> python.
>>>>>
>>>>> Thanks so much!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey 
>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage in 
>>>>>> general:   
>>>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>>>>
>>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz 
>>>>>> <[email protected]> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>> We are using beam and (dataflow) at my company and would like to use it 
>>>>>>> to read and write data from snowflake.
>>>>>>>
>>>>>>> Does anybody know if there are any source/sink available for snowflake?
>>>>>>>
>>>>>>> if not, what would be the easiest way to create those? (maybe there is 
>>>>>>> something for sqlalchemy that we could leverage for that?)
>>>>>>>
>>>>>>>
>>>>>>> Thanks so much!
>>>>>>>
>>>>>>> Alan

Reply via email to