Hi,

Not tested, but few options that might be a solutions for You problem:

1. go with having read and write replicas of Your DB - so that write replica would get inserts one by one and live with this. Make sure to deduplicate the data before insert to avoid potential collisions (this should not be a problem, but I am not sure how the subsystem would behave)

2.

- Add a step to group the input data into a time window

- then ingest the events from a window to a unique temp table (just by plain `INSERT INTO`)

- then add next step in pipeline to trigger merge operation from tmp table to You production table. Make sure same connection session is used or the tmp table will be gone. Also not sure if it is possible to invoke only one command per window by using `WriteToJdbc` operator after previous write finishes. But this is up for your experimentation/ anyone with more experience has some knowledge how to code it?

3. Another option is to

- aggregate events into a window - so that only one element would be emited a window (array of events)

- try somehow to upt this record in statement as a single row

- in subsequent CTEs deserialize the array into multiple rows

- do insert with update.

So the sql on the DB side would look something like:

```

|WITH new_values (arr) as ( values (?) ), deser AS ( SELECT explode(arr) FROM new_values ), ||upsert as ( update mytable m set field1 = nv.field1, field2 = nv.field2 FROM |||deser| nv WHERE m.id = nv.id RETURNING m.* ) INSERT INTO mytable (id, field1, field2) SELECT id, field1, field2 FROM |||deser| WHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE up.id = new_values.id)|

```

Above is just pseudocode that I did not test, but it could be a hint for You. Also great answer on this one here: https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/8702291#8702291

4. some mix of p.2 and p.3

Hopefully this helps You in breaking the problem.

Best regards

Wiśniowski Piotr

On 21.04.2023 01:34, Juan Romero wrote:
Hi. Can someone help me with this?

El mié, 19 abr 2023 a las 15:08, Juan Romero (<jsrf...@gmail.com>) escribió:

    Hi community.

    On this occasion I have a doubt regarding how to read a stream
    from kafka and write batches of data with the jdbc connector. The
    idea is to override a specific row if the current row we want to
    insert into has the same id and the load_date_time is greater. The
    conceptual pipeline look like this and it is working (Take in mind
    that the source will be a streaming from kafka):

    ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, 
load_date_time=str)
    with beam.Pipeline()as p:
       _ = (
           p
           | beam.Create(
             [

                 ExampleRow(1, 'zzzz', '2023-04-05 12:34:56'), ExampleRow(1, 
'yyyz', '2023-04-05 12:34:55')
             ]).with_output_types(ExampleRow)
           |'Write to jdbc' >> WriteToJdbc(
               driver_class_name='org.postgresql.Driver', 
jdbc_url='jdbc:postgresql://localhost:5432/postgres', username='postgres', 
password='postgres', table_name='test', 
connection_properties="stringtype=unspecified", statement='INSERT INTO test \ 
VALUES(?,?,?) \ ON CONFLICT (id)\ DO UPDATE
    SET name = EXCLUDED.name, load_date_time =
    EXCLUDED.load_date_time\ WHERE EXCLUDED.load_date_time::timestamp
    > test.load_date_time::timestamp', ))

    My question is if I want to write a stream that comes from kafka
    how can how can avoid the jdbc connector inserting the register
    one by one statement and rather insert the data in based time
    batches. Probably internally jdbc has some kind of "intelligence
    for do this" but i want to know what do you think about it  .

    Thank you!

Reply via email to