Hi Brice,
Thanks so much for your suggestion! I did the following and it seems to
work:
class ReadSnowflake(beam.DoFn):
def process(self, _):
import snowflake.connector
ctx = snowflake.connector.connect(...)
cs = ctx.cursor()
cs.execute('SELECT a, b FROM t')
while True:
data = cs.fetchmany(1000)
if len(data) == 0:
break;
for d in data:
yield {'a':d[0], 'b':d[1]}
pipeline = beam.Pipeline()
p = (
pipeline
| 'Empty start' >> beam.Create([''])
| 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
I know this is not optimal as it is reading sequentially from snowflake
(instead of doing it in parallel as I'm sure the BQ source does) but apart
from that, do you see any other problems (or possible improvements) with
this code?
Thank you so much!
On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected]> wrote:
> I am using a pattern which I saw online (but can't seem to locate) which
> performs i/o in a DoFn using a standard python client. I use this to read
> and write to Google cloud storage in streaming mode. You could use this
> idea to perform almost any i/o. Depending on your use case and workflow,
> this may be an approach you could consider. Shout if you need some
> boilerplate.
>
> It does look like native support is coming and you know it is true as I
> read it on the internet. :)
>
> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>
>
> You could also setup an external service or endpoint to perform the query
> and read the results into your pipeline in a pipeline step similar to the
> enrichment idea here:
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>
> And you could always write your own connector. Not a task to be taken too
> lightly but it can be done.
>
> HTH
>
>
> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <[email protected]>
> wrote:
>
>> Thanks for sharing this Erik!
>>
>> It would be really nice/convenient to have a python option to do
>> something like that. Our ML team is mostly a python shop and we are also
>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>> their python sdk to author these).
>>
>> Please let me know if you can think of any way we could do this with
>> python.
>>
>> Thanks so much!
>>
>>
>>
>>
>>
>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>> [email protected]> wrote:
>>
>>> You can use the JDBC driver. Here's a blog that describes JDBC usage in
>>> general:
>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>
>>>
>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>> We are using beam and (dataflow) at my company and would like to use it
>>>> to read and write data from snowflake.
>>>>
>>>> Does anybody know if there are any source/sink available for snowflake?
>>>>
>>>> if not, what would be the easiest way to create those? (maybe there is
>>>> something for sqlalchemy that we could leverage for that?)
>>>>
>>>>
>>>> Thanks so much!
>>>>
>>>> Alan
>>>>
>>>