Re: Reducing database connection with JdbcIO

2018-03-14 Thread Eugene Kirpichov
On Wed, Mar 14, 2018 at 1:55 PM Aleksandr  wrote:

> I mean that in case of many threads beam will create many connections( per
> thread, per query). Lets say i have 10 different tables. So for each table
> insert beam will create own connection ++ for each thread for that insert.
>
Yes but not at the same time. E.g. if Dataflow has started 5 workers each
with 50 threads, you'll have 250 threads, it doesn't matter how many tables
you're writing to, even if it's a thousand tables - these 250 threads
together will collaboratively gradually do all the work that's needed.
Think of it as a pool of work to be done + a pool of threads that take some
work when idle.


>
> Lets say I have some uuid generation and BQ insert. In case of problems
> with BQ service the exception will be thrown, but my job will be restored
> from latest checkpoint. So I will not generate uuid for same message twice.
> In case of jdbcio it is possible to get uuid for same mesage twice( in case
> of multiple io it might be a problem).
>
Deduplication of writes based on record id is a BigQuery feature, I don't
think there's a way to accomplish it in general for an arbitrary
JDBC-compliant database. It's up to the user to ensure that their
statements in JdbcIO.write() are idempotent, I think the Javadoc says that,
and recommends using upsert-style statements (if it doesn't, let's fix the
javadoc).


>
> Aleksandr.
>
> 14. märts 2018 10:37 PM kirjutas kuupäeval "Eugene Kirpichov" <
> kirpic...@google.com>:
>
> Aleksandr - it seems that you're assuming that every prepared statement
> uses a connection. This is not the case: we open a connection, and use that
> connection to create prepared statements. For any given thread, there's at
> most 1 connection open at the same time, and the connection has at most 1
> prepared statement open.
>
> Create thread -> (open connection -> (open prepared statement ->
> executeBatch* -> close prepared statement)* -> close connection)*
>
> I'm not sure what you mean by checkpoints, can you elaborate?
>
> On Wed, Mar 14, 2018 at 1:20 PM Aleksandr  wrote:
>
>> So lets say I have 10 prepared statements, and hundreds threads, for
>> example 300. Dataflow will create 3000 connections to sql and in case of
>> autoscaling another node will create again 3000 connections?
>>
>> Another problem here, that jdbcio dont use any checkpoints (and bq for
>> example is doing that). So every connection exception will be thrown upper.
>>
>>
>> 14. märts 2018 10:09 PM kirjutas kuupäeval "Eugene Kirpichov" <
>> kirpic...@google.com>:
>>
>> In a streaming job it'll be roughly once per thread per worker, and
>> Dataflow Streaming runner may create hundreds of threads per worker because
>> it assumes that they are not heavyweight and that low latency is the
>> primary goal rather than high throughput (as in batch runner).
>>
>> A hacky way to limit this parallelism would be to emulate the
>> "repartition", by inserting a chain of transforms: pair with a random key
>> in [0,n), group by key, ungroup - procesing of the result until the next
>> GBK will not be parallelized more than n-wise in practice in the Dataflow
>> streaming runner, so in the particular case of JdbcIO.write() with its
>> current implementation it should help. It may break in the future, e.g. if
>> JdbcIO.write() ever changes to include a GBK before writing. Unfortunately
>> I can't recommend a long-term reliable solution for the moment.
>>
>> On Wed, Mar 14, 2018 at 12:57 PM Aleksandr 
>> wrote:
>>
>>> Hello,
>>> How many times will the setup per node be called? Is it possible to
>>> limit pardo intances in google dataflow?
>>>
>>> Aleksandr.
>>>
>>>
>>>
>>> 14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene Kirpichov" <
>>> kirpic...@google.com>:
>>>
>>> "Jdbcio will create for each prepared statement new connection" - this
>>> is not the case: the connection is created in @Setup and deleted in
>>> @Teardown.
>>>
>>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
>>>
>>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631
>>>
>>> Something else must be going wrong.
>>>
>>> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr 
>>> wrote:
>>>
 Hello, we had similar problem. Current jdbcio will cause alot of
 connection errors.

 Typically you have more than one prepared statement. Jdbcio will create
 for each prepared statement new connection(and close only in teardown) So
 it is possible that connection will get timeot or in case in case of auto
 scaling you will get to many connections to sql.
 Our solution was to create connection pool in setup and get connection
 and return back to pool in processElement.

 Best Regards,
 Aleksandr Gortujev.

 14. märts 2018 8:52 PM kirjutas 

Re: Reducing database connection with JdbcIO

2018-03-14 Thread Jean-Baptiste Onofré

Hi Aleksandr,

I don't get your point about the connection: on your worker, who gonna 
create the threads ? Basically, we will have a connection per thread 
right now.


For the checkpoint, JdbcIO is basically a DoFn, it doesn't deal with a 
CheckpointMark (if it's what you are talking about). Recently, we added 
support of backoff retries for the write part: you can specify a kind of 
Exception for which in that case JdbcIO can retry the statement 
(especially interesting in case of deadlock).


Regards
JB

On 14/03/2018 21:55, Aleksandr wrote:
I mean that in case of many threads beam will create many connections( 
per thread, per query). Lets say i have 10 different tables. So for each 
table insert beam will create own connection ++ for each thread for that 
insert.


Lets say I have some uuid generation and BQ insert. In case of problems 
with BQ service the exception will be thrown, but my job will be 
restored from latest checkpoint. So I will not generate uuid for same 
message twice. In case of jdbcio it is possible to get uuid for same 
mesage twice( in case of multiple io it might be a problem).


Aleksandr.

14. märts 2018 10:37 PM kirjutas kuupäeval "Eugene Kirpichov" 
>:


Aleksandr - it seems that you're assuming that every prepared
statement uses a connection. This is not the case: we open a
connection, and use that connection to create prepared statements.
For any given thread, there's at most 1 connection open at the same
time, and the connection has at most 1 prepared statement open.

Create thread -> (open connection -> (open prepared statement ->
executeBatch* -> close prepared statement)* -> close connection)*

I'm not sure what you mean by checkpoints, can you elaborate?

On Wed, Mar 14, 2018 at 1:20 PM Aleksandr > wrote:

So lets say I have 10 prepared statements, and hundreds threads,
for example 300. Dataflow will create 3000 connections to sql
and in case of autoscaling another node will create again 3000
connections?

Another problem here, that jdbcio dont use any checkpoints (and
bq for example is doing that). So every connection exception
will be thrown upper.


14. märts 2018 10:09 PM kirjutas kuupäeval "Eugene Kirpichov"
>:

In a streaming job it'll be roughly once per thread per
worker, and Dataflow Streaming runner may create hundreds of
threads per worker because it assumes that they are not
heavyweight and that low latency is the primary goal rather
than high throughput (as in batch runner).

A hacky way to limit this parallelism would be to emulate
the "repartition", by inserting a chain of transforms: pair
with a random key in [0,n), group by key, ungroup -
procesing of the result until the next GBK will not be
parallelized more than n-wise in practice in the Dataflow
streaming runner, so in the particular case of
JdbcIO.write() with its current implementation it should
help. It may break in the future, e.g. if JdbcIO.write()
ever changes to include a GBK before writing. Unfortunately
I can't recommend a long-term reliable solution for the moment.

On Wed, Mar 14, 2018 at 12:57 PM Aleksandr
> wrote:

Hello,
How many times will the setup per node be called? Is it
possible to limit pardo intances in google dataflow?

Aleksandr.



14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene
Kirpichov" >:

"Jdbcio will create for each prepared statement new
connection" - this is not the case: the connection
is created in @Setup and deleted in @Teardown.

https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503



https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631



Something else must be going wrong.

On Wed, Mar 14, 2018 at 12:11 PM Aleksandr
> wrote:

  

Re: Reducing database connection with JdbcIO

2018-03-14 Thread Aleksandr
I mean that in case of many threads beam will create many connections( per
thread, per query). Lets say i have 10 different tables. So for each table
insert beam will create own connection ++ for each thread for that insert.

Lets say I have some uuid generation and BQ insert. In case of problems
with BQ service the exception will be thrown, but my job will be restored
from latest checkpoint. So I will not generate uuid for same message twice.
In case of jdbcio it is possible to get uuid for same mesage twice( in case
of multiple io it might be a problem).

Aleksandr.

14. märts 2018 10:37 PM kirjutas kuupäeval "Eugene Kirpichov" <
kirpic...@google.com>:

Aleksandr - it seems that you're assuming that every prepared statement
uses a connection. This is not the case: we open a connection, and use that
connection to create prepared statements. For any given thread, there's at
most 1 connection open at the same time, and the connection has at most 1
prepared statement open.

Create thread -> (open connection -> (open prepared statement ->
executeBatch* -> close prepared statement)* -> close connection)*

I'm not sure what you mean by checkpoints, can you elaborate?

On Wed, Mar 14, 2018 at 1:20 PM Aleksandr  wrote:

> So lets say I have 10 prepared statements, and hundreds threads, for
> example 300. Dataflow will create 3000 connections to sql and in case of
> autoscaling another node will create again 3000 connections?
>
> Another problem here, that jdbcio dont use any checkpoints (and bq for
> example is doing that). So every connection exception will be thrown upper.
>
>
> 14. märts 2018 10:09 PM kirjutas kuupäeval "Eugene Kirpichov" <
> kirpic...@google.com>:
>
> In a streaming job it'll be roughly once per thread per worker, and
> Dataflow Streaming runner may create hundreds of threads per worker because
> it assumes that they are not heavyweight and that low latency is the
> primary goal rather than high throughput (as in batch runner).
>
> A hacky way to limit this parallelism would be to emulate the
> "repartition", by inserting a chain of transforms: pair with a random key
> in [0,n), group by key, ungroup - procesing of the result until the next
> GBK will not be parallelized more than n-wise in practice in the Dataflow
> streaming runner, so in the particular case of JdbcIO.write() with its
> current implementation it should help. It may break in the future, e.g. if
> JdbcIO.write() ever changes to include a GBK before writing. Unfortunately
> I can't recommend a long-term reliable solution for the moment.
>
> On Wed, Mar 14, 2018 at 12:57 PM Aleksandr  wrote:
>
>> Hello,
>> How many times will the setup per node be called? Is it possible to limit
>> pardo intances in google dataflow?
>>
>> Aleksandr.
>>
>>
>>
>> 14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene Kirpichov" <
>> kirpic...@google.com>:
>>
>> "Jdbcio will create for each prepared statement new connection" - this is
>> not the case: the connection is created in @Setup and deleted in @Teardown.
>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/
>> jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/
>> jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631
>>
>> Something else must be going wrong.
>>
>> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr 
>> wrote:
>>
>>> Hello, we had similar problem. Current jdbcio will cause alot of
>>> connection errors.
>>>
>>> Typically you have more than one prepared statement. Jdbcio will create
>>> for each prepared statement new connection(and close only in teardown) So
>>> it is possible that connection will get timeot or in case in case of auto
>>> scaling you will get to many connections to sql.
>>> Our solution was to create connection pool in setup and get connection
>>> and return back to pool in processElement.
>>>
>>> Best Regards,
>>> Aleksandr Gortujev.
>>>
>>> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
>>> j...@nanthrax.net>:
>>>
>>> Agree especially using the current JdbcIO impl that creates connection
>>> in the @Setup. Or it means that @Teardown is never called ?
>>>
>>> Regards
>>> JB
>>> Le 14 mars 2018, à 11:40, Eugene Kirpichov  a
>>> écrit:

 Hi Derek - could you explain where does the "3000 connections" number
 come from, i.e. how did you measure it? It's weird that 5-6 workers would
 use 3000 connections.

 On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:

> Hi,
>
> We are new to Beam and need some help.
>
> We are working on a flow to ingest events and writes the aggregated
> counts to a database. The input rate is rather low (~2000 message per
> sec), but the processing is relatively heavy, that we need to scale out
> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is
> 

Re: Reducing database connection with JdbcIO

2018-03-14 Thread Eugene Kirpichov
Aleksandr - it seems that you're assuming that every prepared statement
uses a connection. This is not the case: we open a connection, and use that
connection to create prepared statements. For any given thread, there's at
most 1 connection open at the same time, and the connection has at most 1
prepared statement open.

Create thread -> (open connection -> (open prepared statement ->
executeBatch* -> close prepared statement)* -> close connection)*

I'm not sure what you mean by checkpoints, can you elaborate?

On Wed, Mar 14, 2018 at 1:20 PM Aleksandr  wrote:

> So lets say I have 10 prepared statements, and hundreds threads, for
> example 300. Dataflow will create 3000 connections to sql and in case of
> autoscaling another node will create again 3000 connections?
>
> Another problem here, that jdbcio dont use any checkpoints (and bq for
> example is doing that). So every connection exception will be thrown upper.
>
>
> 14. märts 2018 10:09 PM kirjutas kuupäeval "Eugene Kirpichov" <
> kirpic...@google.com>:
>
> In a streaming job it'll be roughly once per thread per worker, and
> Dataflow Streaming runner may create hundreds of threads per worker because
> it assumes that they are not heavyweight and that low latency is the
> primary goal rather than high throughput (as in batch runner).
>
> A hacky way to limit this parallelism would be to emulate the
> "repartition", by inserting a chain of transforms: pair with a random key
> in [0,n), group by key, ungroup - procesing of the result until the next
> GBK will not be parallelized more than n-wise in practice in the Dataflow
> streaming runner, so in the particular case of JdbcIO.write() with its
> current implementation it should help. It may break in the future, e.g. if
> JdbcIO.write() ever changes to include a GBK before writing. Unfortunately
> I can't recommend a long-term reliable solution for the moment.
>
> On Wed, Mar 14, 2018 at 12:57 PM Aleksandr  wrote:
>
>> Hello,
>> How many times will the setup per node be called? Is it possible to limit
>> pardo intances in google dataflow?
>>
>> Aleksandr.
>>
>>
>>
>> 14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene Kirpichov" <
>> kirpic...@google.com>:
>>
>> "Jdbcio will create for each prepared statement new connection" - this is
>> not the case: the connection is created in @Setup and deleted in @Teardown.
>>
>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
>>
>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631
>>
>> Something else must be going wrong.
>>
>> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr 
>> wrote:
>>
>>> Hello, we had similar problem. Current jdbcio will cause alot of
>>> connection errors.
>>>
>>> Typically you have more than one prepared statement. Jdbcio will create
>>> for each prepared statement new connection(and close only in teardown) So
>>> it is possible that connection will get timeot or in case in case of auto
>>> scaling you will get to many connections to sql.
>>> Our solution was to create connection pool in setup and get connection
>>> and return back to pool in processElement.
>>>
>>> Best Regards,
>>> Aleksandr Gortujev.
>>>
>>> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
>>> j...@nanthrax.net>:
>>>
>>> Agree especially using the current JdbcIO impl that creates connection
>>> in the @Setup. Or it means that @Teardown is never called ?
>>>
>>> Regards
>>> JB
>>> Le 14 mars 2018, à 11:40, Eugene Kirpichov  a
>>> écrit:

 Hi Derek - could you explain where does the "3000 connections" number
 come from, i.e. how did you measure it? It's weird that 5-6 workers would
 use 3000 connections.

 On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:

> Hi,
>
> We are new to Beam and need some help.
>
> We are working on a flow to ingest events and writes the aggregated
> counts to a database. The input rate is rather low (~2000 message per
> sec), but the processing is relatively heavy, that we need to scale out
> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is
> also
> low. But because of the number of workers, it keeps 3000 connections to
> the database and it keeps hitting the database connection limits.
>
> Is there a way that we can reduce the concurrency only at the output
> stage? (In Spark we would have done a repartition/coalesce).
>
> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
> Dataflow.
>
> Thank you in advance!
>
>
>
>
>>>
>>
>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Aleksandr
So lets say I have 10 prepared statements, and hundreds threads, for
example 300. Dataflow will create 3000 connections to sql and in case of
autoscaling another node will create again 3000 connections?

Another problem here, that jdbcio dont use any checkpoints (and bq for
example is doing that). So every connection exception will be thrown upper.

14. märts 2018 10:09 PM kirjutas kuupäeval "Eugene Kirpichov" <
kirpic...@google.com>:

In a streaming job it'll be roughly once per thread per worker, and
Dataflow Streaming runner may create hundreds of threads per worker because
it assumes that they are not heavyweight and that low latency is the
primary goal rather than high throughput (as in batch runner).

A hacky way to limit this parallelism would be to emulate the
"repartition", by inserting a chain of transforms: pair with a random key
in [0,n), group by key, ungroup - procesing of the result until the next
GBK will not be parallelized more than n-wise in practice in the Dataflow
streaming runner, so in the particular case of JdbcIO.write() with its
current implementation it should help. It may break in the future, e.g. if
JdbcIO.write() ever changes to include a GBK before writing. Unfortunately
I can't recommend a long-term reliable solution for the moment.

On Wed, Mar 14, 2018 at 12:57 PM Aleksandr  wrote:

> Hello,
> How many times will the setup per node be called? Is it possible to limit
> pardo intances in google dataflow?
>
> Aleksandr.
>
>
>
> 14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene Kirpichov" <
> kirpic...@google.com>:
>
> "Jdbcio will create for each prepared statement new connection" - this is
> not the case: the connection is created in @Setup and deleted in @Teardown.
> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/
> jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/
> jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631
>
> Something else must be going wrong.
>
> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr  wrote:
>
>> Hello, we had similar problem. Current jdbcio will cause alot of
>> connection errors.
>>
>> Typically you have more than one prepared statement. Jdbcio will create
>> for each prepared statement new connection(and close only in teardown) So
>> it is possible that connection will get timeot or in case in case of auto
>> scaling you will get to many connections to sql.
>> Our solution was to create connection pool in setup and get connection
>> and return back to pool in processElement.
>>
>> Best Regards,
>> Aleksandr Gortujev.
>>
>> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
>> j...@nanthrax.net>:
>>
>> Agree especially using the current JdbcIO impl that creates connection in
>> the @Setup. Or it means that @Teardown is never called ?
>>
>> Regards
>> JB
>> Le 14 mars 2018, à 11:40, Eugene Kirpichov  a
>> écrit:
>>>
>>> Hi Derek - could you explain where does the "3000 connections" number
>>> come from, i.e. how did you measure it? It's weird that 5-6 workers would
>>> use 3000 connections.
>>>
>>> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:
>>>
 Hi,

 We are new to Beam and need some help.

 We are working on a flow to ingest events and writes the aggregated
 counts to a database. The input rate is rather low (~2000 message per
 sec), but the processing is relatively heavy, that we need to scale out
 to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also
 low. But because of the number of workers, it keeps 3000 connections to
 the database and it keeps hitting the database connection limits.

 Is there a way that we can reduce the concurrency only at the output
 stage? (In Spark we would have done a repartition/coalesce).

 And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
 Dataflow.

 Thank you in advance!




>>
>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Eugene Kirpichov
In a streaming job it'll be roughly once per thread per worker, and
Dataflow Streaming runner may create hundreds of threads per worker because
it assumes that they are not heavyweight and that low latency is the
primary goal rather than high throughput (as in batch runner).

A hacky way to limit this parallelism would be to emulate the
"repartition", by inserting a chain of transforms: pair with a random key
in [0,n), group by key, ungroup - procesing of the result until the next
GBK will not be parallelized more than n-wise in practice in the Dataflow
streaming runner, so in the particular case of JdbcIO.write() with its
current implementation it should help. It may break in the future, e.g. if
JdbcIO.write() ever changes to include a GBK before writing. Unfortunately
I can't recommend a long-term reliable solution for the moment.

On Wed, Mar 14, 2018 at 12:57 PM Aleksandr  wrote:

> Hello,
> How many times will the setup per node be called? Is it possible to limit
> pardo intances in google dataflow?
>
> Aleksandr.
>
>
>
> 14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene Kirpichov" <
> kirpic...@google.com>:
>
> "Jdbcio will create for each prepared statement new connection" - this is
> not the case: the connection is created in @Setup and deleted in @Teardown.
>
> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
>
> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631
>
> Something else must be going wrong.
>
> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr  wrote:
>
>> Hello, we had similar problem. Current jdbcio will cause alot of
>> connection errors.
>>
>> Typically you have more than one prepared statement. Jdbcio will create
>> for each prepared statement new connection(and close only in teardown) So
>> it is possible that connection will get timeot or in case in case of auto
>> scaling you will get to many connections to sql.
>> Our solution was to create connection pool in setup and get connection
>> and return back to pool in processElement.
>>
>> Best Regards,
>> Aleksandr Gortujev.
>>
>> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
>> j...@nanthrax.net>:
>>
>> Agree especially using the current JdbcIO impl that creates connection in
>> the @Setup. Or it means that @Teardown is never called ?
>>
>> Regards
>> JB
>> Le 14 mars 2018, à 11:40, Eugene Kirpichov  a
>> écrit:
>>>
>>> Hi Derek - could you explain where does the "3000 connections" number
>>> come from, i.e. how did you measure it? It's weird that 5-6 workers would
>>> use 3000 connections.
>>>
>>> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:
>>>
 Hi,

 We are new to Beam and need some help.

 We are working on a flow to ingest events and writes the aggregated
 counts to a database. The input rate is rather low (~2000 message per
 sec), but the processing is relatively heavy, that we need to scale out
 to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also
 low. But because of the number of workers, it keeps 3000 connections to
 the database and it keeps hitting the database connection limits.

 Is there a way that we can reduce the concurrency only at the output
 stage? (In Spark we would have done a repartition/coalesce).

 And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
 Dataflow.

 Thank you in advance!




>>
>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Aleksandr
Hello,
How many times will the setup per node be called? Is it possible to limit
pardo intances in google dataflow?

Aleksandr.



14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene Kirpichov" <
kirpic...@google.com>:

"Jdbcio will create for each prepared statement new connection" - this is
not the case: the connection is created in @Setup and deleted in @Teardown.
https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/
jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/
jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631

Something else must be going wrong.

On Wed, Mar 14, 2018 at 12:11 PM Aleksandr  wrote:

> Hello, we had similar problem. Current jdbcio will cause alot of
> connection errors.
>
> Typically you have more than one prepared statement. Jdbcio will create
> for each prepared statement new connection(and close only in teardown) So
> it is possible that connection will get timeot or in case in case of auto
> scaling you will get to many connections to sql.
> Our solution was to create connection pool in setup and get connection and
> return back to pool in processElement.
>
> Best Regards,
> Aleksandr Gortujev.
>
> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
> j...@nanthrax.net>:
>
> Agree especially using the current JdbcIO impl that creates connection in
> the @Setup. Or it means that @Teardown is never called ?
>
> Regards
> JB
> Le 14 mars 2018, à 11:40, Eugene Kirpichov  a écrit:
>>
>> Hi Derek - could you explain where does the "3000 connections" number
>> come from, i.e. how did you measure it? It's weird that 5-6 workers would
>> use 3000 connections.
>>
>> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:
>>
>>> Hi,
>>>
>>> We are new to Beam and need some help.
>>>
>>> We are working on a flow to ingest events and writes the aggregated
>>> counts to a database. The input rate is rather low (~2000 message per
>>> sec), but the processing is relatively heavy, that we need to scale out
>>> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also
>>> low. But because of the number of workers, it keeps 3000 connections to
>>> the database and it keeps hitting the database connection limits.
>>>
>>> Is there a way that we can reduce the concurrency only at the output
>>> stage? (In Spark we would have done a repartition/coalesce).
>>>
>>> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
>>> Dataflow.
>>>
>>> Thank you in advance!
>>>
>>>
>>>
>>>
>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Eugene Kirpichov
I wonder if this is https://issues.apache.org/jira/browse/BEAM-3245 , but
in that JIRA it is unclear exactly in what cases it is broken and how
much. +Thomas
Groh  +Kenn Knowles  Dataflow Runner
normally doesn't leak DoFn's if their ProcessElement threw an exception,
right?

On Wed, Mar 14, 2018 at 12:24 PM Romain Manni-Bucau 
wrote:

> side note: try to do a thread dump on the workers maybe
>
>
> Romain Manni-Bucau
> @rmannibucau  |  Blog
>  | Old Blog
>  | Github
>  | LinkedIn
>  | Book
> 
>
> 2018-03-14 20:21 GMT+01:00 Eugene Kirpichov :
>
>> "Jdbcio will create for each prepared statement new connection" - this is
>> not the case: the connection is created in @Setup and deleted in @Teardown.
>>
>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
>>
>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631
>>
>> Something else must be going wrong.
>>
>> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr 
>> wrote:
>>
>>> Hello, we had similar problem. Current jdbcio will cause alot of
>>> connection errors.
>>>
>>> Typically you have more than one prepared statement. Jdbcio will create
>>> for each prepared statement new connection(and close only in teardown) So
>>> it is possible that connection will get timeot or in case in case of auto
>>> scaling you will get to many connections to sql.
>>> Our solution was to create connection pool in setup and get connection
>>> and return back to pool in processElement.
>>>
>>> Best Regards,
>>> Aleksandr Gortujev.
>>>
>>> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
>>> j...@nanthrax.net>:
>>>
>>> Agree especially using the current JdbcIO impl that creates connection
>>> in the @Setup. Or it means that @Teardown is never called ?
>>>
>>> Regards
>>> JB
>>> Le 14 mars 2018, à 11:40, Eugene Kirpichov  a
>>> écrit:

 Hi Derek - could you explain where does the "3000 connections" number
 come from, i.e. how did you measure it? It's weird that 5-6 workers would
 use 3000 connections.

 On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:

> Hi,
>
> We are new to Beam and need some help.
>
> We are working on a flow to ingest events and writes the aggregated
> counts to a database. The input rate is rather low (~2000 message per
> sec), but the processing is relatively heavy, that we need to scale out
> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is
> also
> low. But because of the number of workers, it keeps 3000 connections to
> the database and it keeps hitting the database connection limits.
>
> Is there a way that we can reduce the concurrency only at the output
> stage? (In Spark we would have done a repartition/coalesce).
>
> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
> Dataflow.
>
> Thank you in advance!
>
>
>
>
>>>
>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Eugene Kirpichov
"Jdbcio will create for each prepared statement new connection" - this is
not the case: the connection is created in @Setup and deleted in @Teardown.
https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631

Something else must be going wrong.

On Wed, Mar 14, 2018 at 12:11 PM Aleksandr  wrote:

> Hello, we had similar problem. Current jdbcio will cause alot of
> connection errors.
>
> Typically you have more than one prepared statement. Jdbcio will create
> for each prepared statement new connection(and close only in teardown) So
> it is possible that connection will get timeot or in case in case of auto
> scaling you will get to many connections to sql.
> Our solution was to create connection pool in setup and get connection and
> return back to pool in processElement.
>
> Best Regards,
> Aleksandr Gortujev.
>
> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
> j...@nanthrax.net>:
>
> Agree especially using the current JdbcIO impl that creates connection in
> the @Setup. Or it means that @Teardown is never called ?
>
> Regards
> JB
> Le 14 mars 2018, à 11:40, Eugene Kirpichov  a écrit:
>>
>> Hi Derek - could you explain where does the "3000 connections" number
>> come from, i.e. how did you measure it? It's weird that 5-6 workers would
>> use 3000 connections.
>>
>> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:
>>
>>> Hi,
>>>
>>> We are new to Beam and need some help.
>>>
>>> We are working on a flow to ingest events and writes the aggregated
>>> counts to a database. The input rate is rather low (~2000 message per
>>> sec), but the processing is relatively heavy, that we need to scale out
>>> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also
>>> low. But because of the number of workers, it keeps 3000 connections to
>>> the database and it keeps hitting the database connection limits.
>>>
>>> Is there a way that we can reduce the concurrency only at the output
>>> stage? (In Spark we would have done a repartition/coalesce).
>>>
>>> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
>>> Dataflow.
>>>
>>> Thank you in advance!
>>>
>>>
>>>
>>>
>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Aleksandr
Hello, we had similar problem. Current jdbcio will cause alot of connection
errors.

Typically you have more than one prepared statement. Jdbcio will create for
each prepared statement new connection(and close only in teardown) So it is
possible that connection will get timeot or in case in case of auto scaling
you will get to many connections to sql.
Our solution was to create connection pool in setup and get connection and
return back to pool in processElement.

Best Regards,
Aleksandr Gortujev.

14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
j...@nanthrax.net>:

Agree especially using the current JdbcIO impl that creates connection in
the @Setup. Or it means that @Teardown is never called ?

Regards
JB
Le 14 mars 2018, à 11:40, Eugene Kirpichov  a écrit:
>
> Hi Derek - could you explain where does the "3000 connections" number come
> from, i.e. how did you measure it? It's weird that 5-6 workers would use
> 3000 connections.
>
> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:
>
>> Hi,
>>
>> We are new to Beam and need some help.
>>
>> We are working on a flow to ingest events and writes the aggregated
>> counts to a database. The input rate is rather low (~2000 message per
>> sec), but the processing is relatively heavy, that we need to scale out
>> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also
>> low. But because of the number of workers, it keeps 3000 connections to
>> the database and it keeps hitting the database connection limits.
>>
>> Is there a way that we can reduce the concurrency only at the output
>> stage? (In Spark we would have done a repartition/coalesce).
>>
>> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
>> Dataflow.
>>
>> Thank you in advance!
>>
>>
>>
>>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Jean-Baptiste Onofré
Agree especially using the current JdbcIO impl that creates connection in the 
@Setup. Or it means that @Teardown is never called ?

Regards
JB

Le 14 mars 2018 à 11:40, à 11:40, Eugene Kirpichov  a 
écrit:
>Hi Derek - could you explain where does the "3000 connections" number
>come
>from, i.e. how did you measure it? It's weird that 5-6 workers would
>use
>3000 connections.
>
>On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:
>
>> Hi,
>>
>> We are new to Beam and need some help.
>>
>> We are working on a flow to ingest events and writes the aggregated
>> counts to a database. The input rate is rather low (~2000 message per
>> sec), but the processing is relatively heavy, that we need to scale
>out
>> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is
>also
>> low. But because of the number of workers, it keeps 3000 connections
>to
>> the database and it keeps hitting the database connection limits.
>>
>> Is there a way that we can reduce the concurrency only at the output
>> stage? (In Spark we would have done a repartition/coalesce).
>>
>> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
>> Dataflow.
>>
>> Thank you in advance!
>>
>>
>>
>>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Eugene Kirpichov
Hi Derek - could you explain where does the "3000 connections" number come
from, i.e. how did you measure it? It's weird that 5-6 workers would use
3000 connections.

On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:

> Hi,
>
> We are new to Beam and need some help.
>
> We are working on a flow to ingest events and writes the aggregated
> counts to a database. The input rate is rather low (~2000 message per
> sec), but the processing is relatively heavy, that we need to scale out
> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also
> low. But because of the number of workers, it keeps 3000 connections to
> the database and it keeps hitting the database connection limits.
>
> Is there a way that we can reduce the concurrency only at the output
> stage? (In Spark we would have done a repartition/coalesce).
>
> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
> Dataflow.
>
> Thank you in advance!
>
>
>
>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Jean-Baptiste Onofré

Hi Derek,

I think you could be interested by:

https://github.com/apache/beam/pull/4461

related to BEAM-3500.

I introduced an internal poolable datasource.

I hope it could help.

Regards
JB

On 14/03/2018 11:49, Derek Chan wrote:

Hi,

We are new to Beam and need some help.

We are working on a flow to ingest events and writes the aggregated 
counts to a database. The input rate is rather low (~2000 message per 
sec), but the processing is relatively heavy, that we need to scale out 
to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also 
low. But because of the number of workers, it keeps 3000 connections to 
the database and it keeps hitting the database connection limits.


Is there a way that we can reduce the concurrency only at the output 
stage? (In Spark we would have done a repartition/coalesce).


And, if it matters, we are using Apache Beam 2.2 via Scio, on Google 
Dataflow.


Thank you in advance!





Re: Reducing database connection with JdbcIO

2018-03-14 Thread Aleksandr
Hello,
We did own jdbcio with thread pool per jwm (using lazy initialization in
@Setup). In processElement we are getting/freeing connection.

Best Regards,
Aleksandr Gortujev.

14. märts 2018 12:49 PM kirjutas kuupäeval "Derek Chan" :

Hi,

We are new to Beam and need some help.

We are working on a flow to ingest events and writes the aggregated counts
to a database. The input rate is rather low (~2000 message per sec), but
the processing is relatively heavy, that we need to scale out to 5~6 nodes.
The output (via JDBC) is aggregated, so the volume is also low. But because
of the number of workers, it keeps 3000 connections to the database and it
keeps hitting the database connection limits.

Is there a way that we can reduce the concurrency only at the output stage?
(In Spark we would have done a repartition/coalesce).

And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
Dataflow.

Thank you in advance!


Reducing database connection with JdbcIO

2018-03-14 Thread Derek Chan

Hi,

We are new to Beam and need some help.

We are working on a flow to ingest events and writes the aggregated 
counts to a database. The input rate is rather low (~2000 message per 
sec), but the processing is relatively heavy, that we need to scale out 
to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also 
low. But because of the number of workers, it keeps 3000 connections to 
the database and it keeps hitting the database connection limits.


Is there a way that we can reduce the concurrency only at the output 
stage? (In Spark we would have done a repartition/coalesce).


And, if it matters, we are using Apache Beam 2.2 via Scio, on Google 
Dataflow.


Thank you in advance!