Hi Pavel,
I have tried the setting you have suggested but this does not seem to work.
Looks like this setting bounds the unbounded stream till the specified time
and then closes the stream and it is only after that the records are
applied downstream.

In my case withMaxReadTime = 3 minutes

Logs are like this:
------------------------------------------------------------------------
13:06:38.267 [direct-runner-worker] INFO a.p.f.b.c.k.ShardReadersPool -
Starting to read test-batch stream from [shardId-000000000000] shards
13:09:39.002 [direct-runner-worker] INFO a.p.f.b.c.k.ShardReadersPool -
Closing shard iterators pool
13:09:42.632 [pool-16-thread-1] INFO a.p.f.b.c.k.ShardReadersPool - Kinesis
Shard read loop has finished


As you can see, it reads all the records and then closes the reader and
only when the read loop is finished, the downstream operators process these
records further.

So if downstream I again right to the same Kinesis stream, it is never read
as the loop is closed.

I guess I may still need the unbounded stream so it continues to listen to
any fresh records added to the stream, but I would like the beam
application to shutdown after a specified time.

Would some like this work:

pipeline.run().waitUntilFinish(Duration.standardMinutes(3));

This will run the pipeline for 3 minutes and then shuts it down and by
then it has computed all the derived data.


Thanks

Sachin



On Thu, May 4, 2023 at 11:26 PM Pavel Solomin <p.o.solo...@gmail.com> wrote:

> Hello!
>
> In case of KinesisIO there is a param which you can set - withMaxReadTime
> I think many IOs implement it. It does not check if there is still some
> outstanding data, simply finishes the pipeline when this time is over.
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Thu, 4 May 2023 at 18:20, Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> Hi,
>> I am kind of building a batch/streaming hybrid beam application.
>> Data is fed into a kinesis stream and the beam pipeline is run.
>>
>> I want to stop the pipeline if no new data is fed into the stream for a
>> certain period of time, say 5 minutes.
>>
>> Is there a way of achieving this ?
>>
>> Right now I only see something like this:
>>
>> pipeline.run().waitUntilFinish();
>>
>> Instead of waiting until finish can we have some conditional finish in
>> any way ?
>>
>> Thanks
>> Sachin
>>
>

Reply via email to