I am using a direct runner.

If I remove the
.withRate(1, Duration.standardSeconds(5)

Then Kinesis IO writes to Kinesis, however it receives all the input
records at once and then throws:
*KPL Expiration reached while waiting in limiter*

I suppose we have certain limitations with direct runner (which I am only
using for writing test cases).
Real example will run on flink runner.

Thanks
Sachin


On Fri, May 12, 2023 at 9:09 PM Pavel Solomin <p.o.solo...@gmail.com> wrote:

> Hello!
>
> > this does not seem to be generating numbers at that rate which is 1 per
> 5 seconds but all at one time
>
> What runner do you use? I've seen that behavior of GenerateSequence only
> in Direct runner.
>
> > Also looks like it may be creating an unbounded collection and looks
> like kinesis is not writing anything to the stream.
>
> Never seen that happening, and I used KinesisIO quite a lot recently in my
> playgrounds - in the same way you use, generating sequences and writing to
> Kinesis. Can you share a full reproducible example of stuck KinesisIO?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 12 May 2023 at 15:04, Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> Hi,
>> I want to emit a bounded sequence of numbers from 0 to n but downstream
>> to receive this sequence at a given rate.
>>
>> This is needed so that we can rate limit the HTTP request downstream.
>>
>> Say if we generate sequence from 1 - 100 then downstream would make 100
>> such requests almost at the same time.
>>
>> So to add gaps I am trying something like this.
>>
>> Would a code like this work ?
>> pipeline
>> .apply(GenerateSequence.from(0).to(100).withRate(1, Duration.
>> standardSeconds(5)))
>> .apply(ParDo.of(new BatchDataLoad()))
>> .apply(KinesisIO.write()
>> .withStreamName(streamName)
>> // other configs
>> );
>>
>>
>> Somehow this does not seem to be generating numbers at that rate which is
>> 1 per 5 seconds but all at one time.
>> Also looks like it may be creating an unbounded collection and looks like
>> kinesis is not writing anything to the stream.
>>
>> If not then is there a way to achieve this?
>>
>> Thanks
>> Sachin
>>
>>

Reply via email to