> two pipeline objects in my application

I think this should work. I meant to have 2 separate artifacts and deploy
them separately, but if your app runs batch processing with 2 sequential
steps, 2 pipelines should work too:

- writePipeline.run().waitUntilFinish()
- readAndWritePipeline.run().waitUntilFinish()

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Wed, 10 May 2023 at 11:49, Sachin Mittal <sjmit...@gmail.com> wrote:

> Use case is something like this:
> A source writes source data to kinesis and same is used to compute derived
> data which is again written back to same stream so next level of derived
> data can be computed from previous derived data and so on.
>
> Would there be any issues from beam side to do the same within a single
> pipeline?
>
> When you say I have to split my app into two do you mean that I have to
> create two pipeline objects in my application?
>
> If so then how will application end?
>
> Note that source is of finite size which gets written into kinesis.
>
> Also we do plan to migrate to aws2 io, but later. If aws1 has some
> limitations in achieving what we want then please let me know.
>
> Thanks
>
>
> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p.o.solo...@gmail.com>
> wrote:
>
>> Hello!
>>
>> I've never seen use-cases where it would be necessary. What are you
>> trying to achieve? Some context would be helpful.
>> Your example looks like you can split your app into two - one writes into
>> streamName and the others read from streamName.
>>
>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and is
>> not maintained anymore. Better to use this instead:
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>>
>> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sjmit...@gmail.com> wrote:
>>
>>> Hi,
>>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>>
>>>
>>> My pipeline is something like this: (*note the kinesis stream used to
>>> write to and then again read from is empty before starting the app*)
>>>
>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>> PCollection<> input = pipeline.apply(/* read from some source */);
>>>
>>> // populate an empty kinesis stream
>>> input
>>> .apply(
>>> KinesisIO.write()
>>> .withStreamName(streamName)
>>> // other IO configs ....
>>> );
>>>
>>> // within same application start another pipeline
>>> // to read from some kinesis stream from start
>>> PCollection<> output = pipeline
>>> .apply(
>>> KinesisIO.read()
>>> .withStreamName(streamName)
>>> .withMaxReadTime(duration) // wait for some duration before deciding to
>>> close the pipeline
>>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>>> from start
>>> // other IO configs
>>> )
>>> .apply(/* apply other transformations */);
>>>
>>>
>>> // write transformed output to same kinesis stream
>>> output
>>> .apply(
>>> KinesisIO.write()
>>> .withStreamName(streamName)
>>> // other IO configs
>>> );
>>>
>>> // also write transformed output to some other kinesis stream
>>> output
>>> .apply(
>>> KinesisIO.write()
>>> .withStreamName(otherStreamName) // a different kinesis stream
>>> // other IO configs
>>> );
>>>
>>>
>>> pipeline.run().waitUntilFinish();
>>>
>>>
>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>
>>> Will something like this work in a single beam application ?
>>> Is there a better way of designing this ?
>>>
>>> I am right now trying to run this using a direct runner but I am facing
>>> some issues in reading from the same kinesis stream again.
>>> It is actually able to read the records but somehow read records are not
>>> pushed downstream for further processing.
>>>
>>> Before debugging it further and looking into any logic issues or bugs in
>>> my code, I wanted to be sure if something like this is possible under beam
>>> constructs.
>>>
>>> Please let me know your thoughts.
>>>
>>> Thanks
>>> Sachin
>>>
>>>

Reply via email to