Hi,
So I have prepared the write pipeline something like this:

--------------------------------------------------------------------------------------------------------------
writePipeline
.apply(GenerateSequence.from(0).to(100))
.apply(ParDo.of(new DoFn<Long, byte[]>() {
@ProcessElement
public void processElement(ProcessContext c) {
long i = c.element();
// Fetching data for step=i
List<> data = fetchForInputStep(i);
// output all the data one by one
for (Data d : data) {
out.output(d.asBytes());
}
}
}))
.apply(KinesisIO.write()
.withStreamName(streamName)
// other configs
);

writePipeline.run().waitUntilFinish()

What I observe is that pipeline part to push data to kinesis is only
happening after the entire data is loaded by a second apply function.
So what happens is that 100,000's of data records are accumulated and they
are tried to be pushed to Kinesis all at once and we get following error:
*KPL Expiration reached while waiting in limiter*

The logs are generated like this:
--------------------------------------------------------------------------------------------------------------
Extracting binaries to
/var/folders/30/knyj9z4d3psbd4s6kffqc5000000gn/T/amazon-kinesis-producer-native-binaries
.........
[main.cc:384] Starting up main producer
.........
[main.cc:395] Entering join
.........
Fetching data for step=1
.........
Fetching data for step=100
.........
[kinesis_producer.cc:200] Created pipeline for stream "xxxxxx"
[shard_map.cc:87] Updating shard map for stream "xxxxxx"
[shard_map.cc:148] Successfully updated shard map for stream "xxxxxx" found
1 shards
[processing_statistics_logger.cc:111] Stage 1 Triggers: { stream: 'xxxxxx',
manual: 10, count: 0, size: 4688, matches: 0, timed: 0, UserRecords:
742018, KinesisRecords: 4698 }


I had assumed that as soon as step 1 data was fetched it would pass the
data downstream and
the kinesis pipeline would have been created much before and would have
started writing to Kinesis much earlier, but this is happening only after
all the data is collected.

Is there a way to fix this ?

Thanks
Sachin



On Wed, May 10, 2023 at 4:29 PM Pavel Solomin <p.o.solo...@gmail.com> wrote:

> > two pipeline objects in my application
>
> I think this should work. I meant to have 2 separate artifacts and deploy
> them separately, but if your app runs batch processing with 2 sequential
> steps, 2 pipelines should work too:
>
> - writePipeline.run().waitUntilFinish()
> - readAndWritePipeline.run().waitUntilFinish()
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Wed, 10 May 2023 at 11:49, Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> Use case is something like this:
>> A source writes source data to kinesis and same is used to compute
>> derived data which is again written back to same stream so next level of
>> derived data can be computed from previous derived data and so on.
>>
>> Would there be any issues from beam side to do the same within a single
>> pipeline?
>>
>> When you say I have to split my app into two do you mean that I have to
>> create two pipeline objects in my application?
>>
>> If so then how will application end?
>>
>> Note that source is of finite size which gets written into kinesis.
>>
>> Also we do plan to migrate to aws2 io, but later. If aws1 has some
>> limitations in achieving what we want then please let me know.
>>
>> Thanks
>>
>>
>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p.o.solo...@gmail.com>
>> wrote:
>>
>>> Hello!
>>>
>>> I've never seen use-cases where it would be necessary. What are you
>>> trying to achieve? Some context would be helpful.
>>> Your example looks like you can split your app into two - one writes
>>> into streamName and the others read from streamName.
>>>
>>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
>>> is not maintained anymore. Better to use this instead:
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sjmit...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>>>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>>>
>>>>
>>>> My pipeline is something like this: (*note the kinesis stream used to
>>>> write to and then again read from is empty before starting the app*)
>>>>
>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>> Pipeline pipeline = Pipeline.create(options);
>>>>
>>>> PCollection<> input = pipeline.apply(/* read from some source */);
>>>>
>>>> // populate an empty kinesis stream
>>>> input
>>>> .apply(
>>>> KinesisIO.write()
>>>> .withStreamName(streamName)
>>>> // other IO configs ....
>>>> );
>>>>
>>>> // within same application start another pipeline
>>>> // to read from some kinesis stream from start
>>>> PCollection<> output = pipeline
>>>> .apply(
>>>> KinesisIO.read()
>>>> .withStreamName(streamName)
>>>> .withMaxReadTime(duration) // wait for some duration before deciding
>>>> to close the pipeline
>>>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>>>> from start
>>>> // other IO configs
>>>> )
>>>> .apply(/* apply other transformations */);
>>>>
>>>>
>>>> // write transformed output to same kinesis stream
>>>> output
>>>> .apply(
>>>> KinesisIO.write()
>>>> .withStreamName(streamName)
>>>> // other IO configs
>>>> );
>>>>
>>>> // also write transformed output to some other kinesis stream
>>>> output
>>>> .apply(
>>>> KinesisIO.write()
>>>> .withStreamName(otherStreamName) // a different kinesis stream
>>>> // other IO configs
>>>> );
>>>>
>>>>
>>>> pipeline.run().waitUntilFinish();
>>>>
>>>>
>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>> Will something like this work in a single beam application ?
>>>> Is there a better way of designing this ?
>>>>
>>>> I am right now trying to run this using a direct runner but I am facing
>>>> some issues in reading from the same kinesis stream again.
>>>> It is actually able to read the records but somehow read records are
>>>> not pushed downstream for further processing.
>>>>
>>>> Before debugging it further and looking into any logic issues or bugs
>>>> in my code, I wanted to be sure if something like this is possible under
>>>> beam constructs.
>>>>
>>>> Please let me know your thoughts.
>>>>
>>>> Thanks
>>>> Sachin
>>>>
>>>>

Reply via email to