Hi community. On this occasion I have a doubt regarding how to read a stream from kafka and write batches of data with the jdbc connector. The idea is to override a specific row if the current row we want to insert into has the same id and the load_date_time is greater. The conceptual pipeline look like this and it is working (Take in mind that the source will be a streaming from kafka):
ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, load_date_time=str) with beam.Pipeline() as p: _ = ( p | beam.Create( [ ExampleRow(1, 'zzzz', '2023-04-05 12:34:56'), ExampleRow(1, 'yyyz', '2023-04-05 12:34:55') ]).with_output_types(ExampleRow) | 'Write to jdbc' >> WriteToJdbc( driver_class_name='org.postgresql.Driver', jdbc_url='jdbc:postgresql://localhost:5432/postgres', username='postgres', password='postgres', table_name= 'test', connection_properties="stringtype=unspecified", statement= 'INSERT INTO test \ VALUES(?,?,?) \ ON CONFLICT (id)\ DO UPDATE SET name = EXCLUDED.name, load_date_time = EXCLUDED.load_date_time\ WHERE EXCLUDED.load_date_time::timestamp > test.load_date_time::timestamp', )) My question is if I want to write a stream that comes from kafka how can how can avoid the jdbc connector inserting the register one by one statement and rather insert the data in based time batches. Probably internally jdbc has some kind of "intelligence for do this" but i want to know what do you think about it . Thank you!