Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-12 Thread Sachin Mittal
Only direct runner.

I have right now disabled aggregation on kpl and it looks like to be
working.

On Sat, 13 May 2023 at 3:35 AM, Pavel Solomin  wrote:

> > 100,000's of data records are accumulated and they are tried to be
> pushed to Kinesis all at once
>
> Does that happen only in direct runner? Or Flink runner behaves similarly?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> 
>
>
>
>
>
> On Fri, 12 May 2023 at 16:43, Sachin Mittal  wrote:
>
>> Hi,
>> So I have prepared the write pipeline something like this:
>>
>>
>> --
>> writePipeline
>> .apply(GenerateSequence.from(0).to(100))
>> .apply(ParDo.of(new DoFn() {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> long i = c.element();
>> // Fetching data for step=i
>> List<> data = fetchForInputStep(i);
>> // output all the data one by one
>> for (Data d : data) {
>> out.output(d.asBytes());
>> }
>> }
>> }))
>> .apply(KinesisIO.write()
>> .withStreamName(streamName)
>> // other configs
>> );
>>
>> writePipeline.run().waitUntilFinish()
>>
>> What I observe is that pipeline part to push data to kinesis is only
>> happening after the entire data is loaded by a second apply function.
>> So what happens is that 100,000's of data records are accumulated and
>> they are tried to be pushed to Kinesis all at once and we get following
>> error:
>> *KPL Expiration reached while waiting in limiter*
>>
>> The logs are generated like this:
>>
>> --
>> Extracting binaries to
>> /var/folders/30/knyj9z4d3psbd4s6kffqc500gn/T/amazon-kinesis-producer-native-binaries
>> .
>> [main.cc:384] Starting up main producer
>> .
>> [main.cc:395] Entering join
>> .
>> Fetching data for step=1
>> .
>> Fetching data for step=100
>> .
>> [kinesis_producer.cc:200] Created pipeline for stream "xx"
>> [shard_map.cc:87] Updating shard map for stream "xx"
>> [shard_map.cc:148] Successfully updated shard map for stream "xx"
>> found 1 shards
>> [processing_statistics_logger.cc:111] Stage 1 Triggers: { stream:
>> 'xx', manual: 10, count: 0, size: 4688, matches: 0, timed: 0,
>> UserRecords: 742018, KinesisRecords: 4698 }
>>
>>
>> I had assumed that as soon as step 1 data was fetched it would pass the
>> data downstream and
>> the kinesis pipeline would have been created much before and would have
>> started writing to Kinesis much earlier, but this is happening only after
>> all the data is collected.
>>
>> Is there a way to fix this ?
>>
>> Thanks
>> Sachin
>>
>>
>>
>> On Wed, May 10, 2023 at 4:29 PM Pavel Solomin 
>> wrote:
>>
>>> > two pipeline objects in my application
>>>
>>> I think this should work. I meant to have 2 separate artifacts and
>>> deploy them separately, but if your app runs batch processing with 2
>>> sequential steps, 2 pipelines should work too:
>>>
>>> - writePipeline.run().waitUntilFinish()
>>> - readAndWritePipeline.run().waitUntilFinish()
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> 
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 10 May 2023 at 11:49, Sachin Mittal  wrote:
>>>
 Use case is something like this:
 A source writes source data to kinesis and same is used to compute
 derived data which is again written back to same stream so next level of
 derived data can be computed from previous derived data and so on.

 Would there be any issues from beam side to do the same within a single
 pipeline?

 When you say I have to split my app into two do you mean that I have to
 create two pipeline objects in my application?

 If so then how will application end?

 Note that source is of finite size which gets written into kinesis.

 Also we do plan to migrate to aws2 io, but later. If aws1 has some
 limitations in achieving what we want then please let me know.

 Thanks


 On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin 
 wrote:

> Hello!
>
> I've never seen use-cases where it would be necessary. What are you
> trying to achieve? Some context would be helpful.
> Your example looks like you can split your app into two - one writes
> into streamName and the others read from streamName.
>
> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
> is not maintained anymore. Better to use this instead:
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> 

Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-12 Thread Pavel Solomin
> 100,000's of data records are accumulated and they are tried to be pushed
to Kinesis all at once

Does that happen only in direct runner? Or Flink runner behaves similarly?

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin






On Fri, 12 May 2023 at 16:43, Sachin Mittal  wrote:

> Hi,
> So I have prepared the write pipeline something like this:
>
>
> --
> writePipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> long i = c.element();
> // Fetching data for step=i
> List<> data = fetchForInputStep(i);
> // output all the data one by one
> for (Data d : data) {
> out.output(d.asBytes());
> }
> }
> }))
> .apply(KinesisIO.write()
> .withStreamName(streamName)
> // other configs
> );
>
> writePipeline.run().waitUntilFinish()
>
> What I observe is that pipeline part to push data to kinesis is only
> happening after the entire data is loaded by a second apply function.
> So what happens is that 100,000's of data records are accumulated and they
> are tried to be pushed to Kinesis all at once and we get following error:
> *KPL Expiration reached while waiting in limiter*
>
> The logs are generated like this:
>
> --
> Extracting binaries to
> /var/folders/30/knyj9z4d3psbd4s6kffqc500gn/T/amazon-kinesis-producer-native-binaries
> .
> [main.cc:384] Starting up main producer
> .
> [main.cc:395] Entering join
> .
> Fetching data for step=1
> .
> Fetching data for step=100
> .
> [kinesis_producer.cc:200] Created pipeline for stream "xx"
> [shard_map.cc:87] Updating shard map for stream "xx"
> [shard_map.cc:148] Successfully updated shard map for stream "xx"
> found 1 shards
> [processing_statistics_logger.cc:111] Stage 1 Triggers: { stream:
> 'xx', manual: 10, count: 0, size: 4688, matches: 0, timed: 0,
> UserRecords: 742018, KinesisRecords: 4698 }
>
>
> I had assumed that as soon as step 1 data was fetched it would pass the
> data downstream and
> the kinesis pipeline would have been created much before and would have
> started writing to Kinesis much earlier, but this is happening only after
> all the data is collected.
>
> Is there a way to fix this ?
>
> Thanks
> Sachin
>
>
>
> On Wed, May 10, 2023 at 4:29 PM Pavel Solomin 
> wrote:
>
>> > two pipeline objects in my application
>>
>> I think this should work. I meant to have 2 separate artifacts and deploy
>> them separately, but if your app runs batch processing with 2 sequential
>> steps, 2 pipelines should work too:
>>
>> - writePipeline.run().waitUntilFinish()
>> - readAndWritePipeline.run().waitUntilFinish()
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> 
>>
>>
>>
>>
>>
>> On Wed, 10 May 2023 at 11:49, Sachin Mittal  wrote:
>>
>>> Use case is something like this:
>>> A source writes source data to kinesis and same is used to compute
>>> derived data which is again written back to same stream so next level of
>>> derived data can be computed from previous derived data and so on.
>>>
>>> Would there be any issues from beam side to do the same within a single
>>> pipeline?
>>>
>>> When you say I have to split my app into two do you mean that I have to
>>> create two pipeline objects in my application?
>>>
>>> If so then how will application end?
>>>
>>> Note that source is of finite size which gets written into kinesis.
>>>
>>> Also we do plan to migrate to aws2 io, but later. If aws1 has some
>>> limitations in achieving what we want then please let me know.
>>>
>>> Thanks
>>>
>>>
>>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin 
>>> wrote:
>>>
 Hello!

 I've never seen use-cases where it would be necessary. What are you
 trying to achieve? Some context would be helpful.
 Your example looks like you can split your app into two - one writes
 into streamName and the others read from streamName.

 P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
 is not maintained anymore. Better to use this instead:
 https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html

 Best Regards,
 Pavel Solomin

 Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
 





 On Wed, 10 May 2023 at 10:50, Sachin Mittal  wrote:

> Hi,
> I am using aws beam sdk1 to read from and write to a kinesis stream.
> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>
>
> My pipeline is something like this: (*note the kinesis stream used to
>

Question about metrics

2023-05-12 Thread hsy...@gmail.com
Hi I have questions about metrics. I want to use beam metrics api to send
metrics to GCP monitoring.  Instead of collecting just some simple
numeric values. I also need to send labels along with them. Is there a way
to do that? Thanks!


Re: Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Pavel Solomin
Direct runner was meant to be test-only runner, and not to be
production-use runner, and I don't know if it behaves fine with batch
processing of data bulks. Do you experience the same issues when you run
everything on Flink runner?

Beam codebase has integration tests with Direct runner - those include
writing and reading:

- Legacy:
https://github.com/apache/beam/blob/master/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
- AWS SDK V2:
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/testing/KinesisIOIT.java

Maybe, it will help you to set up yours.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin






On Fri, 12 May 2023 at 16:48, Sachin Mittal  wrote:

> I am using a direct runner.
>
> If I remove the
> .withRate(1, Duration.standardSeconds(5)
>
> Then Kinesis IO writes to Kinesis, however it receives all the input
> records at once and then throws:
> *KPL Expiration reached while waiting in limiter*
>
> I suppose we have certain limitations with direct runner (which I am only
> using for writing test cases).
> Real example will run on flink runner.
>
> Thanks
> Sachin
>
>
> On Fri, May 12, 2023 at 9:09 PM Pavel Solomin 
> wrote:
>
>> Hello!
>>
>> > this does not seem to be generating numbers at that rate which is 1 per
>> 5 seconds but all at one time
>>
>> What runner do you use? I've seen that behavior of GenerateSequence only
>> in Direct runner.
>>
>> > Also looks like it may be creating an unbounded collection and looks
>> like kinesis is not writing anything to the stream.
>>
>> Never seen that happening, and I used KinesisIO quite a lot recently in
>> my playgrounds - in the same way you use, generating sequences and writing
>> to Kinesis. Can you share a full reproducible example of stuck KinesisIO?
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> 
>>
>>
>>
>>
>>
>> On Fri, 12 May 2023 at 15:04, Sachin Mittal  wrote:
>>
>>> Hi,
>>> I want to emit a bounded sequence of numbers from 0 to n but downstream
>>> to receive this sequence at a given rate.
>>>
>>> This is needed so that we can rate limit the HTTP request downstream.
>>>
>>> Say if we generate sequence from 1 - 100 then downstream would make 100
>>> such requests almost at the same time.
>>>
>>> So to add gaps I am trying something like this.
>>>
>>> Would a code like this work ?
>>> pipeline
>>> .apply(GenerateSequence.from(0).to(100).withRate(1, Duration.
>>> standardSeconds(5)))
>>> .apply(ParDo.of(new BatchDataLoad()))
>>> .apply(KinesisIO.write()
>>> .withStreamName(streamName)
>>> // other configs
>>> );
>>>
>>>
>>> Somehow this does not seem to be generating numbers at that rate which
>>> is 1 per 5 seconds but all at one time.
>>> Also looks like it may be creating an unbounded collection and looks
>>> like kinesis is not writing anything to the stream.
>>>
>>> If not then is there a way to achieve this?
>>>
>>> Thanks
>>> Sachin
>>>
>>>


Re: Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Sachin Mittal
I am using a direct runner.

If I remove the
.withRate(1, Duration.standardSeconds(5)

Then Kinesis IO writes to Kinesis, however it receives all the input
records at once and then throws:
*KPL Expiration reached while waiting in limiter*

I suppose we have certain limitations with direct runner (which I am only
using for writing test cases).
Real example will run on flink runner.

Thanks
Sachin


On Fri, May 12, 2023 at 9:09 PM Pavel Solomin  wrote:

> Hello!
>
> > this does not seem to be generating numbers at that rate which is 1 per
> 5 seconds but all at one time
>
> What runner do you use? I've seen that behavior of GenerateSequence only
> in Direct runner.
>
> > Also looks like it may be creating an unbounded collection and looks
> like kinesis is not writing anything to the stream.
>
> Never seen that happening, and I used KinesisIO quite a lot recently in my
> playgrounds - in the same way you use, generating sequences and writing to
> Kinesis. Can you share a full reproducible example of stuck KinesisIO?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> 
>
>
>
>
>
> On Fri, 12 May 2023 at 15:04, Sachin Mittal  wrote:
>
>> Hi,
>> I want to emit a bounded sequence of numbers from 0 to n but downstream
>> to receive this sequence at a given rate.
>>
>> This is needed so that we can rate limit the HTTP request downstream.
>>
>> Say if we generate sequence from 1 - 100 then downstream would make 100
>> such requests almost at the same time.
>>
>> So to add gaps I am trying something like this.
>>
>> Would a code like this work ?
>> pipeline
>> .apply(GenerateSequence.from(0).to(100).withRate(1, Duration.
>> standardSeconds(5)))
>> .apply(ParDo.of(new BatchDataLoad()))
>> .apply(KinesisIO.write()
>> .withStreamName(streamName)
>> // other configs
>> );
>>
>>
>> Somehow this does not seem to be generating numbers at that rate which is
>> 1 per 5 seconds but all at one time.
>> Also looks like it may be creating an unbounded collection and looks like
>> kinesis is not writing anything to the stream.
>>
>> If not then is there a way to achieve this?
>>
>> Thanks
>> Sachin
>>
>>


Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-12 Thread Sachin Mittal
Hi,
So I have prepared the write pipeline something like this:

--
writePipeline
.apply(GenerateSequence.from(0).to(100))
.apply(ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
long i = c.element();
// Fetching data for step=i
List<> data = fetchForInputStep(i);
// output all the data one by one
for (Data d : data) {
out.output(d.asBytes());
}
}
}))
.apply(KinesisIO.write()
.withStreamName(streamName)
// other configs
);

writePipeline.run().waitUntilFinish()

What I observe is that pipeline part to push data to kinesis is only
happening after the entire data is loaded by a second apply function.
So what happens is that 100,000's of data records are accumulated and they
are tried to be pushed to Kinesis all at once and we get following error:
*KPL Expiration reached while waiting in limiter*

The logs are generated like this:
--
Extracting binaries to
/var/folders/30/knyj9z4d3psbd4s6kffqc500gn/T/amazon-kinesis-producer-native-binaries
.
[main.cc:384] Starting up main producer
.
[main.cc:395] Entering join
.
Fetching data for step=1
.
Fetching data for step=100
.
[kinesis_producer.cc:200] Created pipeline for stream "xx"
[shard_map.cc:87] Updating shard map for stream "xx"
[shard_map.cc:148] Successfully updated shard map for stream "xx" found
1 shards
[processing_statistics_logger.cc:111] Stage 1 Triggers: { stream: 'xx',
manual: 10, count: 0, size: 4688, matches: 0, timed: 0, UserRecords:
742018, KinesisRecords: 4698 }


I had assumed that as soon as step 1 data was fetched it would pass the
data downstream and
the kinesis pipeline would have been created much before and would have
started writing to Kinesis much earlier, but this is happening only after
all the data is collected.

Is there a way to fix this ?

Thanks
Sachin



On Wed, May 10, 2023 at 4:29 PM Pavel Solomin  wrote:

> > two pipeline objects in my application
>
> I think this should work. I meant to have 2 separate artifacts and deploy
> them separately, but if your app runs batch processing with 2 sequential
> steps, 2 pipelines should work too:
>
> - writePipeline.run().waitUntilFinish()
> - readAndWritePipeline.run().waitUntilFinish()
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> 
>
>
>
>
>
> On Wed, 10 May 2023 at 11:49, Sachin Mittal  wrote:
>
>> Use case is something like this:
>> A source writes source data to kinesis and same is used to compute
>> derived data which is again written back to same stream so next level of
>> derived data can be computed from previous derived data and so on.
>>
>> Would there be any issues from beam side to do the same within a single
>> pipeline?
>>
>> When you say I have to split my app into two do you mean that I have to
>> create two pipeline objects in my application?
>>
>> If so then how will application end?
>>
>> Note that source is of finite size which gets written into kinesis.
>>
>> Also we do plan to migrate to aws2 io, but later. If aws1 has some
>> limitations in achieving what we want then please let me know.
>>
>> Thanks
>>
>>
>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin 
>> wrote:
>>
>>> Hello!
>>>
>>> I've never seen use-cases where it would be necessary. What are you
>>> trying to achieve? Some context would be helpful.
>>> Your example looks like you can split your app into two - one writes
>>> into streamName and the others read from streamName.
>>>
>>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
>>> is not maintained anymore. Better to use this instead:
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> 
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 10 May 2023 at 10:50, Sachin Mittal  wrote:
>>>
 Hi,
 I am using aws beam sdk1 to read from and write to a kinesis stream.
 *org.apache.beam.sdk.io.kinesis.KinesisIO*


 My pipeline is something like this: (*note the kinesis stream used to
 write to and then again read from is empty before starting the app*)

 ---
 Pipeline pipeline = Pipeline.create(options);

 PCollection<> input = pipeline.apply(/* read from some source */);

 // populate an empty kinesis stream
 input
 .apply(
 KinesisIO.write()
 .withStreamName(streamName)
 // other IO configs 
 );

 // within same application start another pipeline

Re: Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Pavel Solomin
Hello!

> this does not seem to be generating numbers at that rate which is 1 per 5
seconds but all at one time

What runner do you use? I've seen that behavior of GenerateSequence only in
Direct runner.

> Also looks like it may be creating an unbounded collection and looks like
kinesis is not writing anything to the stream.

Never seen that happening, and I used KinesisIO quite a lot recently in my
playgrounds - in the same way you use, generating sequences and writing to
Kinesis. Can you share a full reproducible example of stuck KinesisIO?

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin






On Fri, 12 May 2023 at 15:04, Sachin Mittal  wrote:

> Hi,
> I want to emit a bounded sequence of numbers from 0 to n but downstream to
> receive this sequence at a given rate.
>
> This is needed so that we can rate limit the HTTP request downstream.
>
> Say if we generate sequence from 1 - 100 then downstream would make 100
> such requests almost at the same time.
>
> So to add gaps I am trying something like this.
>
> Would a code like this work ?
> pipeline
> .apply(GenerateSequence.from(0).to(100).withRate(1, Duration.
> standardSeconds(5)))
> .apply(ParDo.of(new BatchDataLoad()))
> .apply(KinesisIO.write()
> .withStreamName(streamName)
> // other configs
> );
>
>
> Somehow this does not seem to be generating numbers at that rate which is
> 1 per 5 seconds but all at one time.
> Also looks like it may be creating an unbounded collection and looks like
> kinesis is not writing anything to the stream.
>
> If not then is there a way to achieve this?
>
> Thanks
> Sachin
>
>


Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Sachin Mittal
Hi,
I want to emit a bounded sequence of numbers from 0 to n but downstream to
receive this sequence at a given rate.

This is needed so that we can rate limit the HTTP request downstream.

Say if we generate sequence from 1 - 100 then downstream would make 100
such requests almost at the same time.

So to add gaps I am trying something like this.

Would a code like this work ?
pipeline
.apply(GenerateSequence.from(0).to(100).withRate(1, Duration.standardSeconds
(5)))
.apply(ParDo.of(new BatchDataLoad()))
.apply(KinesisIO.write()
.withStreamName(streamName)
// other configs
);


Somehow this does not seem to be generating numbers at that rate which is 1
per 5 seconds but all at one time.
Also looks like it may be creating an unbounded collection and looks like
kinesis is not writing anything to the stream.

If not then is there a way to achieve this?

Thanks
Sachin