Hi Feba, I can't say for sure *where* your pipeline is running out of memory, but I'm going to guess that it's due to the fact that CassandraIO currently only has the ability to read up an entire table, or have a single query attached. So if you are calling CassandraIO.read() that grabs all the "user data", it's going to load it up as much as possible.
I have a pull request to add a readAll() method to CassandraIO that should allow you to do what you want. Ismeal and I have been working on it on and off for quite some time but hoping to get it merged in this month. The way readAll works is that it receives as INPUT what query/data needs to be retrieved from Cassandra, so it can then be used beyond just the first part of the pipeline. We are using this quite a lot (from my branch) at my current gig for when we have pipelines similar to yours. Here is what it would look like: CassanraIO.<User>read() ---> MapElements into a query for readAll() ---> cassandraIO.<UserData>readAll() ---> aggregation of user data ---> output This way if you only have one thread doing the aggregation of user data, you'll in effect only be doing one user at a time. I'm not sure when exactly readAll will be merged in, but you could also write your own connector that does something similar by copying my code (or taking inspiration from it, etc). *~Vincent* On Tue, Dec 15, 2020 at 1:11 AM Feba Fathima <[email protected]> wrote: > > Hi, > > We are creating a beam pipeline to do batch processing of data > bundles. The pipeline reads records using CassandraIO. We want to process > the data in batches of 30 min then group/stitch 30 min data and write it to > another table. I have 300 bundles for each employee and we need to process > at least process 50 employees using the limited resources(~2Gi). But > currently the heap usage is very high so that we are only able to process 1 > employee(with ~4Gi). if we give more data we are getting Out of memory/Heap > errors. > > Is there a way to process 1 employee at a time. Like a loop so that we can > process all employees sequentially with our ~2Gi. > > We have also posted the same question on Stack Overflow and did not get a > help till now either. > > > https://stackoverflow.com/questions/65274909/looping-inside-a-beam-transform-process-sequentially-using-apache-beam > > Kindly guide us through this if someone is familiar with the scenario. > > -- > Thanks & Regards, > Feba Fathima > > >
