I am using a direct runner. If I remove the .withRate(1, Duration.standardSeconds(5)
Then Kinesis IO writes to Kinesis, however it receives all the input records at once and then throws: *KPL Expiration reached while waiting in limiter* I suppose we have certain limitations with direct runner (which I am only using for writing test cases). Real example will run on flink runner. Thanks Sachin On Fri, May 12, 2023 at 9:09 PM Pavel Solomin <p.o.solo...@gmail.com> wrote: > Hello! > > > this does not seem to be generating numbers at that rate which is 1 per > 5 seconds but all at one time > > What runner do you use? I've seen that behavior of GenerateSequence only > in Direct runner. > > > Also looks like it may be creating an unbounded collection and looks > like kinesis is not writing anything to the stream. > > Never seen that happening, and I used KinesisIO quite a lot recently in my > playgrounds - in the same way you use, generating sequences and writing to > Kinesis. Can you share a full reproducible example of stuck KinesisIO? > > Best Regards, > Pavel Solomin > > Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > <https://www.linkedin.com/in/pavelsolomin> > > > > > > On Fri, 12 May 2023 at 15:04, Sachin Mittal <sjmit...@gmail.com> wrote: > >> Hi, >> I want to emit a bounded sequence of numbers from 0 to n but downstream >> to receive this sequence at a given rate. >> >> This is needed so that we can rate limit the HTTP request downstream. >> >> Say if we generate sequence from 1 - 100 then downstream would make 100 >> such requests almost at the same time. >> >> So to add gaps I am trying something like this. >> >> Would a code like this work ? >> pipeline >> .apply(GenerateSequence.from(0).to(100).withRate(1, Duration. >> standardSeconds(5))) >> .apply(ParDo.of(new BatchDataLoad())) >> .apply(KinesisIO.write() >> .withStreamName(streamName) >> // other configs >> ); >> >> >> Somehow this does not seem to be generating numbers at that rate which is >> 1 per 5 seconds but all at one time. >> Also looks like it may be creating an unbounded collection and looks like >> kinesis is not writing anything to the stream. >> >> If not then is there a way to achieve this? >> >> Thanks >> Sachin >> >>