Hi Yomal,

Actually, usually all data in Beam pipeline is processed by bundles (or chunks) 
if it processed by DoFn. The size of the bundle is up to your processing engine 
and, iirc, there is no way in Beam to change it.

Talking about your case -  did you try to change a fetch size for Beam’s JdbcIO 
connector or for your own one?
Normally, it just gives a hint for the JDBC driver as to the number of rows 
that should be fetched from the database [1].

Another option could be to try to read data with JdbcIO.readWithPartitions() 
that will execute several instances of the query on the same table
using ranges [2].

—
Alexey

[1] 
https://github.com/apache/beam/blob/c8f68f92097de33fe2c6863344404a1b9922ae27/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1524
[2] 
https://beam.apache.org/releases/javadoc/2.49.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor-

> On 17 Jul 2023, at 13:33, Yomal de Silva <yomal.prav...@gmail.com> wrote:
> 
> Hi all,
> 
> I have a pipeline which reads data from a database(postgresql), enrich the 
> data through a side input and finally publish the results to Kafka. Currently 
> I am not using the builtin JDBCIO to read the data but I think there wont be 
> any difference in using that. With my implementation I have set the fetchsize 
> and pass the data to the next transform to process. I have 2 questions here,
> 
> 1. For batch based processing pipelines is there a way to process elements in 
> chunks rather than reading the entire dataset and loading that to memory? 
> What I have observed is that it occupies a significant amount of memory and 
> may even cause OOM exceptions. I am looking for sort of a backpressure 
> implementation or any other way to stop reading all the data into memory 
> until some of the records gets processed. I have found the following answer 
> [1] which states thats not possible, since this answer was provided some time 
> ago wanted to check if it is still the case.
> 
> 2. When dealing with side inputs, again does it loads everything into memory 
> and use the appropriate window to carry out the operation inside a transform? 
> 
> Please let me know if you have any solutions for this. 
> 
> [1] 
> https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam
> 
> Thank you.

Reply via email to