Hi Pavel, I have tried the setting you have suggested but this does not seem to work. Looks like this setting bounds the unbounded stream till the specified time and then closes the stream and it is only after that the records are applied downstream.
In my case withMaxReadTime = 3 minutes Logs are like this: ------------------------------------------------------------------------ 13:06:38.267 [direct-runner-worker] INFO a.p.f.b.c.k.ShardReadersPool - Starting to read test-batch stream from [shardId-000000000000] shards 13:09:39.002 [direct-runner-worker] INFO a.p.f.b.c.k.ShardReadersPool - Closing shard iterators pool 13:09:42.632 [pool-16-thread-1] INFO a.p.f.b.c.k.ShardReadersPool - Kinesis Shard read loop has finished As you can see, it reads all the records and then closes the reader and only when the read loop is finished, the downstream operators process these records further. So if downstream I again right to the same Kinesis stream, it is never read as the loop is closed. I guess I may still need the unbounded stream so it continues to listen to any fresh records added to the stream, but I would like the beam application to shutdown after a specified time. Would some like this work: pipeline.run().waitUntilFinish(Duration.standardMinutes(3)); This will run the pipeline for 3 minutes and then shuts it down and by then it has computed all the derived data. Thanks Sachin On Thu, May 4, 2023 at 11:26 PM Pavel Solomin <p.o.solo...@gmail.com> wrote: > Hello! > > In case of KinesisIO there is a param which you can set - withMaxReadTime > I think many IOs implement it. It does not check if there is still some > outstanding data, simply finishes the pipeline when this time is over. > > Best Regards, > Pavel Solomin > > Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > <https://www.linkedin.com/in/pavelsolomin> > > > > > > On Thu, 4 May 2023 at 18:20, Sachin Mittal <sjmit...@gmail.com> wrote: > >> Hi, >> I am kind of building a batch/streaming hybrid beam application. >> Data is fed into a kinesis stream and the beam pipeline is run. >> >> I want to stop the pipeline if no new data is fed into the stream for a >> certain period of time, say 5 minutes. >> >> Is there a way of achieving this ? >> >> Right now I only see something like this: >> >> pipeline.run().waitUntilFinish(); >> >> Instead of waiting until finish can we have some conditional finish in >> any way ? >> >> Thanks >> Sachin >> >