I mean that in case of many threads beam will create many connections( per thread, per query). Lets say i have 10 different tables. So for each table insert beam will create own connection ++ for each thread for that insert.
Lets say I have some uuid generation and BQ insert. In case of problems with BQ service the exception will be thrown, but my job will be restored from latest checkpoint. So I will not generate uuid for same message twice. In case of jdbcio it is possible to get uuid for same mesage twice( in case of multiple io it might be a problem). Aleksandr. 14. märts 2018 10:37 PM kirjutas kuupäeval "Eugene Kirpichov" < [email protected]>: Aleksandr - it seems that you're assuming that every prepared statement uses a connection. This is not the case: we open a connection, and use that connection to create prepared statements. For any given thread, there's at most 1 connection open at the same time, and the connection has at most 1 prepared statement open. Create thread -> (open connection -> (open prepared statement -> executeBatch* -> close prepared statement)* -> close connection)* I'm not sure what you mean by checkpoints, can you elaborate? On Wed, Mar 14, 2018 at 1:20 PM Aleksandr <[email protected]> wrote: > So lets say I have 10 prepared statements, and hundreds threads, for > example 300. Dataflow will create 3000 connections to sql and in case of > autoscaling another node will create again 3000 connections? > > Another problem here, that jdbcio dont use any checkpoints (and bq for > example is doing that). So every connection exception will be thrown upper. > > > 14. märts 2018 10:09 PM kirjutas kuupäeval "Eugene Kirpichov" < > [email protected]>: > > In a streaming job it'll be roughly once per thread per worker, and > Dataflow Streaming runner may create hundreds of threads per worker because > it assumes that they are not heavyweight and that low latency is the > primary goal rather than high throughput (as in batch runner). > > A hacky way to limit this parallelism would be to emulate the > "repartition", by inserting a chain of transforms: pair with a random key > in [0,n), group by key, ungroup - procesing of the result until the next > GBK will not be parallelized more than n-wise in practice in the Dataflow > streaming runner, so in the particular case of JdbcIO.write() with its > current implementation it should help. It may break in the future, e.g. if > JdbcIO.write() ever changes to include a GBK before writing. Unfortunately > I can't recommend a long-term reliable solution for the moment. > > On Wed, Mar 14, 2018 at 12:57 PM Aleksandr <[email protected]> wrote: > >> Hello, >> How many times will the setup per node be called? Is it possible to limit >> pardo intances in google dataflow? >> >> Aleksandr. >> >> >> >> 14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene Kirpichov" < >> [email protected]>: >> >> "Jdbcio will create for each prepared statement new connection" - this is >> not the case: the connection is created in @Setup and deleted in @Teardown. >> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/ >> jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503 >> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/ >> jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631 >> >> Something else must be going wrong. >> >> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr <[email protected]> >> wrote: >> >>> Hello, we had similar problem. Current jdbcio will cause alot of >>> connection errors. >>> >>> Typically you have more than one prepared statement. Jdbcio will create >>> for each prepared statement new connection(and close only in teardown) So >>> it is possible that connection will get timeot or in case in case of auto >>> scaling you will get to many connections to sql. >>> Our solution was to create connection pool in setup and get connection >>> and return back to pool in processElement. >>> >>> Best Regards, >>> Aleksandr Gortujev. >>> >>> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" < >>> [email protected]>: >>> >>> Agree especially using the current JdbcIO impl that creates connection >>> in the @Setup. Or it means that @Teardown is never called ? >>> >>> Regards >>> JB >>> Le 14 mars 2018, à 11:40, Eugene Kirpichov <[email protected]> a >>> écrit: >>>> >>>> Hi Derek - could you explain where does the "3000 connections" number >>>> come from, i.e. how did you measure it? It's weird that 5-6 workers would >>>> use 3000 connections. >>>> >>>> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> We are new to Beam and need some help. >>>>> >>>>> We are working on a flow to ingest events and writes the aggregated >>>>> counts to a database. The input rate is rather low (~2000 message per >>>>> sec), but the processing is relatively heavy, that we need to scale out >>>>> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is >>>>> also >>>>> low. But because of the number of workers, it keeps 3000 connections to >>>>> the database and it keeps hitting the database connection limits. >>>>> >>>>> Is there a way that we can reduce the concurrency only at the output >>>>> stage? (In Spark we would have done a repartition/coalesce). >>>>> >>>>> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google >>>>> Dataflow. >>>>> >>>>> Thank you in advance! >>>>> >>>>> >>>>> >>>>> >>> >> >
