At first glance, I think it even could be implemented and incapsulated inside 
JdbcIO, like JdbcIO.readParallel(), and helps users to read in parallel (under 
some conditions and limitations of course) if it satisfies their requests. 
Wdyt? Eugene, Reza?

> On 30 Jan 2020, at 03:28, Reza Rokni <[email protected]> wrote:
> 
> @Eugene Kirpichov <mailto:[email protected]> that pattern would be a nice 
> one to add to the Apache Beam patterns page:
> 
> https://beam.apache.org/documentation/patterns/overview/ 
> <https://beam.apache.org/documentation/patterns/overview/>
> 
> Raised a jira, would be nice first PR for anyone wanting to contribute to 
> Beam :D
> 
> https://issues.apache.org/jira/browse/BEAM-9222 
> <https://issues.apache.org/jira/browse/BEAM-9222>
> On Thu, 30 Jan 2020 at 05:16, Eugene Kirpichov <[email protected] 
> <mailto:[email protected]>> wrote:
> Note that 
> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>  
> <https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/>
>  , while focused on the JdbcIO connector, still explains how to read from a 
> database in parallel - that solution would equally apply to your code, 
> because it is not specific to JDBC.
> You'd need to decide on a primary key that supports efficient range scans, 
> and create a pipeline like: (create key ranges) | reshuffle | (ReadSnowflake 
> each range).
> 
> On Wed, Jan 29, 2020 at 11:01 AM Alan Krumholz <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Brice,
> 
> Thanks so much for your suggestion! I did the following and it seems to work:
> 
> class ReadSnowflake(beam.DoFn):
>     def process(self, _):
>         import snowflake.connector
>         ctx = snowflake.connector.connect(...)
>         cs = ctx.cursor()
>         cs.execute('SELECT a, b FROM t')
>         while True:
>             data =  cs.fetchmany(1000)
>             if len(data) == 0:
>                 break;
>             for d in data:
>                 yield {'a':d[0], 'b':d[1]}
> 
> pipeline = beam.Pipeline()
> p = (
>     pipeline
>     | 'Empty start' >> beam.Create([''])
>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
> 
> 
> I know this is not optimal as it is reading sequentially from snowflake 
> (instead of doing it in parallel as I'm sure the BQ source does) but apart 
> from that, do you see any other problems (or possible improvements) with this 
> code?
> 
> 
> Thank you so much!
> 
> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <[email protected] 
> <mailto:[email protected]>> wrote:
> I am using a pattern which I saw online (but can't seem to locate) which 
> performs i/o in a DoFn using a standard python client. I use this to read and 
> write to Google cloud storage in streaming mode. You could use this idea to 
> perform almost any i/o. Depending on your use case and workflow, this may be 
> an approach you could consider. Shout if you need some boilerplate.
> 
> It does look like native support is coming and you know it is true as I read 
> it on the internet.    :)
> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>  
> <https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/>
>  
> 
> You could also setup an external service or endpoint to perform the query and 
> read the results into your pipeline in a pipeline step similar to the 
> enrichment idea here: 
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>  
> <https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1>
> 
> And you could always write your own connector. Not a task to be taken too 
> lightly but it can be done.
> 
> HTH
> 
> 
> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <[email protected] 
> <mailto:[email protected]>> wrote:
> Thanks for sharing this Erik!
> 
> It would be really nice/convenient to have a python option to do something 
> like that. Our ML team is mostly a python shop and we are also using kubeflow 
> pipelines to orchestrate our ML pipelines (mostly using their python sdk to 
> author these).
> 
> Please let me know if you can think of any way we could do this with python.
> 
> Thanks so much!
> 
> 
> 
> 
> 
> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <[email protected] 
> <mailto:[email protected]>> wrote:
> You can use the JDBC driver.  Here's a blog that describes JDBC usage in 
> general:   
> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>  
> <https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/>
>   
> 
> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi,
> We are using beam and (dataflow) at my company and would like to use it to 
> read and write data from snowflake.
> 
> Does anybody know if there are any source/sink available for snowflake?
> 
> if not, what would be the easiest way to create those? (maybe there is 
> something for sqlalchemy that we could leverage for that?)
> 
> 
> Thanks so much!
> 
> Alan

Reply via email to