Reading from RDBMS and processing the data downstream of your pipeline is not 
the same in terms of bundling. 

The main “issue" with a former is that it reads mostly in a single thread per 
SQL-query and JDBC client is not exception.  So, Beam can’t split data, that 
are not yet read, into bundles. 

How much records it will keep in the memory is up to corresponding JBDC driver 
and “fetch size” is just a hint for it. So, it depends on driver, not on Beam 
in this case.

As a workaround, as I suggested above, it was developed a “partitioned” version 
of read for JdbcIO which has some weakness but maybe help in some situations.

—
Alexey

> On 17 Jul 2023, at 20:29, Yomal de Silva <yomal.prav...@gmail.com> wrote:
> 
> Hi Alexey,
> 
> Yes, I have tried changing the fetch size for my implementation. What I 
> observed through the Flink dashboard was the reading transform gets completed 
> quickly and one of the other transforms takes a much longer time (due to some 
> logic). 
> 
> Even if Apache Beam processes data in bundles when reading from a data source 
> like a database it would not wait till a single bundle reaches the end of the 
> pipeline. Is that understanding correct? So it will eventually read the 
> entire dataset, loading it into memory. 
> 
> I haven't tried the 2nd option you suggested. Will try it out. 
> 
> Thank you
> 
> On Mon, Jul 17, 2023 at 10:08 PM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
>> Hi Yomal,
>> 
>> Actually, usually all data in Beam pipeline is processed by bundles (or 
>> chunks) if it processed by DoFn. The size of the bundle is up to your 
>> processing engine and, iirc, there is no way in Beam to change it.
>> 
>> Talking about your case -  did you try to change a fetch size for Beam’s 
>> JdbcIO connector or for your own one?
>> Normally, it just gives a hint for the JDBC driver as to the number of rows 
>> that should be fetched from the database [1].
>> 
>> Another option could be to try to read data with JdbcIO.readWithPartitions() 
>> that will execute several instances of the query on the same table
>> using ranges [2].
>> 
>> —
>> Alexey
>> 
>> [1] 
>> https://github.com/apache/beam/blob/c8f68f92097de33fe2c6863344404a1b9922ae27/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1524
>> [2] 
>> https://beam.apache.org/releases/javadoc/2.49.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor-
>> 
>>> On 17 Jul 2023, at 13:33, Yomal de Silva <yomal.prav...@gmail.com 
>>> <mailto:yomal.prav...@gmail.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I have a pipeline which reads data from a database(postgresql), enrich the 
>>> data through a side input and finally publish the results to Kafka. 
>>> Currently I am not using the builtin JDBCIO to read the data but I think 
>>> there wont be any difference in using that. With my implementation I have 
>>> set the fetchsize and pass the data to the next transform to process. I 
>>> have 2 questions here,
>>> 
>>> 1. For batch based processing pipelines is there a way to process elements 
>>> in chunks rather than reading the entire dataset and loading that to 
>>> memory? What I have observed is that it occupies a significant amount of 
>>> memory and may even cause OOM exceptions. I am looking for sort of a 
>>> backpressure implementation or any other way to stop reading all the data 
>>> into memory until some of the records gets processed. I have found the 
>>> following answer [1] which states thats not possible, since this answer was 
>>> provided some time ago wanted to check if it is still the case.
>>> 
>>> 2. When dealing with side inputs, again does it loads everything into 
>>> memory and use the appropriate window to carry out the operation inside a 
>>> transform? 
>>> 
>>> Please let me know if you have any solutions for this. 
>>> 
>>> [1] 
>>> https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam
>>> 
>>> Thank you.
>> 

Reply via email to