On Wed, Mar 14, 2018 at 1:55 PM Aleksandr <[email protected]> wrote:

> I mean that in case of many threads beam will create many connections( per
> thread, per query). Lets say i have 10 different tables. So for each table
> insert beam will create own connection ++ for each thread for that insert.
>
Yes but not at the same time. E.g. if Dataflow has started 5 workers each
with 50 threads, you'll have 250 threads, it doesn't matter how many tables
you're writing to, even if it's a thousand tables - these 250 threads
together will collaboratively gradually do all the work that's needed.
Think of it as a pool of work to be done + a pool of threads that take some
work when idle.


>
> Lets say I have some uuid generation and BQ insert. In case of problems
> with BQ service the exception will be thrown, but my job will be restored
> from latest checkpoint. So I will not generate uuid for same message twice.
> In case of jdbcio it is possible to get uuid for same mesage twice( in case
> of multiple io it might be a problem).
>
Deduplication of writes based on record id is a BigQuery feature, I don't
think there's a way to accomplish it in general for an arbitrary
JDBC-compliant database. It's up to the user to ensure that their
statements in JdbcIO.write() are idempotent, I think the Javadoc says that,
and recommends using upsert-style statements (if it doesn't, let's fix the
javadoc).


>
> Aleksandr.
>
> 14. märts 2018 10:37 PM kirjutas kuupäeval "Eugene Kirpichov" <
> [email protected]>:
>
> Aleksandr - it seems that you're assuming that every prepared statement
> uses a connection. This is not the case: we open a connection, and use that
> connection to create prepared statements. For any given thread, there's at
> most 1 connection open at the same time, and the connection has at most 1
> prepared statement open.
>
> Create thread -> (open connection -> (open prepared statement ->
> executeBatch* -> close prepared statement)* -> close connection)*
>
> I'm not sure what you mean by checkpoints, can you elaborate?
>
> On Wed, Mar 14, 2018 at 1:20 PM Aleksandr <[email protected]> wrote:
>
>> So lets say I have 10 prepared statements, and hundreds threads, for
>> example 300. Dataflow will create 3000 connections to sql and in case of
>> autoscaling another node will create again 3000 connections?
>>
>> Another problem here, that jdbcio dont use any checkpoints (and bq for
>> example is doing that). So every connection exception will be thrown upper.
>>
>>
>> 14. märts 2018 10:09 PM kirjutas kuupäeval "Eugene Kirpichov" <
>> [email protected]>:
>>
>> In a streaming job it'll be roughly once per thread per worker, and
>> Dataflow Streaming runner may create hundreds of threads per worker because
>> it assumes that they are not heavyweight and that low latency is the
>> primary goal rather than high throughput (as in batch runner).
>>
>> A hacky way to limit this parallelism would be to emulate the
>> "repartition", by inserting a chain of transforms: pair with a random key
>> in [0,n), group by key, ungroup - procesing of the result until the next
>> GBK will not be parallelized more than n-wise in practice in the Dataflow
>> streaming runner, so in the particular case of JdbcIO.write() with its
>> current implementation it should help. It may break in the future, e.g. if
>> JdbcIO.write() ever changes to include a GBK before writing. Unfortunately
>> I can't recommend a long-term reliable solution for the moment.
>>
>> On Wed, Mar 14, 2018 at 12:57 PM Aleksandr <[email protected]>
>> wrote:
>>
>>> Hello,
>>> How many times will the setup per node be called? Is it possible to
>>> limit pardo intances in google dataflow?
>>>
>>> Aleksandr.
>>>
>>>
>>>
>>> 14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene Kirpichov" <
>>> [email protected]>:
>>>
>>> "Jdbcio will create for each prepared statement new connection" - this
>>> is not the case: the connection is created in @Setup and deleted in
>>> @Teardown.
>>>
>>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
>>>
>>> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631
>>>
>>> Something else must be going wrong.
>>>
>>> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr <[email protected]>
>>> wrote:
>>>
>>>> Hello, we had similar problem. Current jdbcio will cause alot of
>>>> connection errors.
>>>>
>>>> Typically you have more than one prepared statement. Jdbcio will create
>>>> for each prepared statement new connection(and close only in teardown) So
>>>> it is possible that connection will get timeot or in case in case of auto
>>>> scaling you will get to many connections to sql.
>>>> Our solution was to create connection pool in setup and get connection
>>>> and return back to pool in processElement.
>>>>
>>>> Best Regards,
>>>> Aleksandr Gortujev.
>>>>
>>>> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
>>>> [email protected]>:
>>>>
>>>> Agree especially using the current JdbcIO impl that creates connection
>>>> in the @Setup. Or it means that @Teardown is never called ?
>>>>
>>>> Regards
>>>> JB
>>>> Le 14 mars 2018, à 11:40, Eugene Kirpichov <[email protected]> a
>>>> écrit:
>>>>>
>>>>> Hi Derek - could you explain where does the "3000 connections" number
>>>>> come from, i.e. how did you measure it? It's weird that 5-6 workers would
>>>>> use 3000 connections.
>>>>>
>>>>> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan <[email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We are new to Beam and need some help.
>>>>>>
>>>>>> We are working on a flow to ingest events and writes the aggregated
>>>>>> counts to a database. The input rate is rather low (~2000 message per
>>>>>> sec), but the processing is relatively heavy, that we need to scale
>>>>>> out
>>>>>> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is
>>>>>> also
>>>>>> low. But because of the number of workers, it keeps 3000 connections
>>>>>> to
>>>>>> the database and it keeps hitting the database connection limits.
>>>>>>
>>>>>> Is there a way that we can reduce the concurrency only at the output
>>>>>> stage? (In Spark we would have done a repartition/coalesce).
>>>>>>
>>>>>> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
>>>>>> Dataflow.
>>>>>>
>>>>>> Thank you in advance!
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to