> two pipeline objects in my application I think this should work. I meant to have 2 separate artifacts and deploy them separately, but if your app runs batch processing with 2 sequential steps, 2 pipelines should work too:
- writePipeline.run().waitUntilFinish() - readAndWritePipeline.run().waitUntilFinish() Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Wed, 10 May 2023 at 11:49, Sachin Mittal <sjmit...@gmail.com> wrote: > Use case is something like this: > A source writes source data to kinesis and same is used to compute derived > data which is again written back to same stream so next level of derived > data can be computed from previous derived data and so on. > > Would there be any issues from beam side to do the same within a single > pipeline? > > When you say I have to split my app into two do you mean that I have to > create two pipeline objects in my application? > > If so then how will application end? > > Note that source is of finite size which gets written into kinesis. > > Also we do plan to migrate to aws2 io, but later. If aws1 has some > limitations in achieving what we want then please let me know. > > Thanks > > > On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p.o.solo...@gmail.com> > wrote: > >> Hello! >> >> I've never seen use-cases where it would be necessary. What are you >> trying to achieve? Some context would be helpful. >> Your example looks like you can split your app into two - one writes into >> streamName and the others read from streamName. >> >> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and is >> not maintained anymore. Better to use this instead: >> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html >> >> Best Regards, >> Pavel Solomin >> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >> <https://www.linkedin.com/in/pavelsolomin> >> >> >> >> >> >> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sjmit...@gmail.com> wrote: >> >>> Hi, >>> I am using aws beam sdk1 to read from and write to a kinesis stream. >>> *org.apache.beam.sdk.io.kinesis.KinesisIO* >>> >>> >>> My pipeline is something like this: (*note the kinesis stream used to >>> write to and then again read from is empty before starting the app*) >>> >>> --------------------------------------------------------------------------------------------------------------------------------------- >>> Pipeline pipeline = Pipeline.create(options); >>> >>> PCollection<> input = pipeline.apply(/* read from some source */); >>> >>> // populate an empty kinesis stream >>> input >>> .apply( >>> KinesisIO.write() >>> .withStreamName(streamName) >>> // other IO configs .... >>> ); >>> >>> // within same application start another pipeline >>> // to read from some kinesis stream from start >>> PCollection<> output = pipeline >>> .apply( >>> KinesisIO.read() >>> .withStreamName(streamName) >>> .withMaxReadTime(duration) // wait for some duration before deciding to >>> close the pipeline >>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) // >>> from start >>> // other IO configs >>> ) >>> .apply(/* apply other transformations */); >>> >>> >>> // write transformed output to same kinesis stream >>> output >>> .apply( >>> KinesisIO.write() >>> .withStreamName(streamName) >>> // other IO configs >>> ); >>> >>> // also write transformed output to some other kinesis stream >>> output >>> .apply( >>> KinesisIO.write() >>> .withStreamName(otherStreamName) // a different kinesis stream >>> // other IO configs >>> ); >>> >>> >>> pipeline.run().waitUntilFinish(); >>> >>> >>> --------------------------------------------------------------------------------------------------------------------------------------- >>> >>> Will something like this work in a single beam application ? >>> Is there a better way of designing this ? >>> >>> I am right now trying to run this using a direct runner but I am facing >>> some issues in reading from the same kinesis stream again. >>> It is actually able to read the records but somehow read records are not >>> pushed downstream for further processing. >>> >>> Before debugging it further and looking into any logic issues or bugs in >>> my code, I wanted to be sure if something like this is possible under beam >>> constructs. >>> >>> Please let me know your thoughts. >>> >>> Thanks >>> Sachin >>> >>>