On Wed, Mar 14, 2018 at 1:55 PM Aleksandr <[email protected]> wrote:
> I mean that in case of many threads beam will create many connections( per > thread, per query). Lets say i have 10 different tables. So for each table > insert beam will create own connection ++ for each thread for that insert. > Yes but not at the same time. E.g. if Dataflow has started 5 workers each with 50 threads, you'll have 250 threads, it doesn't matter how many tables you're writing to, even if it's a thousand tables - these 250 threads together will collaboratively gradually do all the work that's needed. Think of it as a pool of work to be done + a pool of threads that take some work when idle. > > Lets say I have some uuid generation and BQ insert. In case of problems > with BQ service the exception will be thrown, but my job will be restored > from latest checkpoint. So I will not generate uuid for same message twice. > In case of jdbcio it is possible to get uuid for same mesage twice( in case > of multiple io it might be a problem). > Deduplication of writes based on record id is a BigQuery feature, I don't think there's a way to accomplish it in general for an arbitrary JDBC-compliant database. It's up to the user to ensure that their statements in JdbcIO.write() are idempotent, I think the Javadoc says that, and recommends using upsert-style statements (if it doesn't, let's fix the javadoc). > > Aleksandr. > > 14. märts 2018 10:37 PM kirjutas kuupäeval "Eugene Kirpichov" < > [email protected]>: > > Aleksandr - it seems that you're assuming that every prepared statement > uses a connection. This is not the case: we open a connection, and use that > connection to create prepared statements. For any given thread, there's at > most 1 connection open at the same time, and the connection has at most 1 > prepared statement open. > > Create thread -> (open connection -> (open prepared statement -> > executeBatch* -> close prepared statement)* -> close connection)* > > I'm not sure what you mean by checkpoints, can you elaborate? > > On Wed, Mar 14, 2018 at 1:20 PM Aleksandr <[email protected]> wrote: > >> So lets say I have 10 prepared statements, and hundreds threads, for >> example 300. Dataflow will create 3000 connections to sql and in case of >> autoscaling another node will create again 3000 connections? >> >> Another problem here, that jdbcio dont use any checkpoints (and bq for >> example is doing that). So every connection exception will be thrown upper. >> >> >> 14. märts 2018 10:09 PM kirjutas kuupäeval "Eugene Kirpichov" < >> [email protected]>: >> >> In a streaming job it'll be roughly once per thread per worker, and >> Dataflow Streaming runner may create hundreds of threads per worker because >> it assumes that they are not heavyweight and that low latency is the >> primary goal rather than high throughput (as in batch runner). >> >> A hacky way to limit this parallelism would be to emulate the >> "repartition", by inserting a chain of transforms: pair with a random key >> in [0,n), group by key, ungroup - procesing of the result until the next >> GBK will not be parallelized more than n-wise in practice in the Dataflow >> streaming runner, so in the particular case of JdbcIO.write() with its >> current implementation it should help. It may break in the future, e.g. if >> JdbcIO.write() ever changes to include a GBK before writing. Unfortunately >> I can't recommend a long-term reliable solution for the moment. >> >> On Wed, Mar 14, 2018 at 12:57 PM Aleksandr <[email protected]> >> wrote: >> >>> Hello, >>> How many times will the setup per node be called? Is it possible to >>> limit pardo intances in google dataflow? >>> >>> Aleksandr. >>> >>> >>> >>> 14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene Kirpichov" < >>> [email protected]>: >>> >>> "Jdbcio will create for each prepared statement new connection" - this >>> is not the case: the connection is created in @Setup and deleted in >>> @Teardown. >>> >>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503 >>> >>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631 >>> >>> Something else must be going wrong. >>> >>> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr <[email protected]> >>> wrote: >>> >>>> Hello, we had similar problem. Current jdbcio will cause alot of >>>> connection errors. >>>> >>>> Typically you have more than one prepared statement. Jdbcio will create >>>> for each prepared statement new connection(and close only in teardown) So >>>> it is possible that connection will get timeot or in case in case of auto >>>> scaling you will get to many connections to sql. >>>> Our solution was to create connection pool in setup and get connection >>>> and return back to pool in processElement. >>>> >>>> Best Regards, >>>> Aleksandr Gortujev. >>>> >>>> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" < >>>> [email protected]>: >>>> >>>> Agree especially using the current JdbcIO impl that creates connection >>>> in the @Setup. Or it means that @Teardown is never called ? >>>> >>>> Regards >>>> JB >>>> Le 14 mars 2018, à 11:40, Eugene Kirpichov <[email protected]> a >>>> écrit: >>>>> >>>>> Hi Derek - could you explain where does the "3000 connections" number >>>>> come from, i.e. how did you measure it? It's weird that 5-6 workers would >>>>> use 3000 connections. >>>>> >>>>> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan <[email protected]> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> We are new to Beam and need some help. >>>>>> >>>>>> We are working on a flow to ingest events and writes the aggregated >>>>>> counts to a database. The input rate is rather low (~2000 message per >>>>>> sec), but the processing is relatively heavy, that we need to scale >>>>>> out >>>>>> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is >>>>>> also >>>>>> low. But because of the number of workers, it keeps 3000 connections >>>>>> to >>>>>> the database and it keeps hitting the database connection limits. >>>>>> >>>>>> Is there a way that we can reduce the concurrency only at the output >>>>>> stage? (In Spark we would have done a repartition/coalesce). >>>>>> >>>>>> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google >>>>>> Dataflow. >>>>>> >>>>>> Thank you in advance! >>>>>> >>>>>> >>>>>> >>>>>> >>>> >>> >> >
