Use case is something like this: A source writes source data to kinesis and same is used to compute derived data which is again written back to same stream so next level of derived data can be computed from previous derived data and so on.
Would there be any issues from beam side to do the same within a single pipeline? When you say I have to split my app into two do you mean that I have to create two pipeline objects in my application? If so then how will application end? Note that source is of finite size which gets written into kinesis. Also we do plan to migrate to aws2 io, but later. If aws1 has some limitations in achieving what we want then please let me know. Thanks On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p.o.solo...@gmail.com> wrote: > Hello! > > I've never seen use-cases where it would be necessary. What are you trying > to achieve? Some context would be helpful. > Your example looks like you can split your app into two - one writes into > streamName and the others read from streamName. > > P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and is > not maintained anymore. Better to use this instead: > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html > > Best Regards, > Pavel Solomin > > Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > <https://www.linkedin.com/in/pavelsolomin> > > > > > > On Wed, 10 May 2023 at 10:50, Sachin Mittal <sjmit...@gmail.com> wrote: > >> Hi, >> I am using aws beam sdk1 to read from and write to a kinesis stream. >> *org.apache.beam.sdk.io.kinesis.KinesisIO* >> >> >> My pipeline is something like this: (*note the kinesis stream used to >> write to and then again read from is empty before starting the app*) >> >> --------------------------------------------------------------------------------------------------------------------------------------- >> Pipeline pipeline = Pipeline.create(options); >> >> PCollection<> input = pipeline.apply(/* read from some source */); >> >> // populate an empty kinesis stream >> input >> .apply( >> KinesisIO.write() >> .withStreamName(streamName) >> // other IO configs .... >> ); >> >> // within same application start another pipeline >> // to read from some kinesis stream from start >> PCollection<> output = pipeline >> .apply( >> KinesisIO.read() >> .withStreamName(streamName) >> .withMaxReadTime(duration) // wait for some duration before deciding to >> close the pipeline >> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) // >> from start >> // other IO configs >> ) >> .apply(/* apply other transformations */); >> >> >> // write transformed output to same kinesis stream >> output >> .apply( >> KinesisIO.write() >> .withStreamName(streamName) >> // other IO configs >> ); >> >> // also write transformed output to some other kinesis stream >> output >> .apply( >> KinesisIO.write() >> .withStreamName(otherStreamName) // a different kinesis stream >> // other IO configs >> ); >> >> >> pipeline.run().waitUntilFinish(); >> >> >> --------------------------------------------------------------------------------------------------------------------------------------- >> >> Will something like this work in a single beam application ? >> Is there a better way of designing this ? >> >> I am right now trying to run this using a direct runner but I am facing >> some issues in reading from the same kinesis stream again. >> It is actually able to read the records but somehow read records are not >> pushed downstream for further processing. >> >> Before debugging it further and looking into any logic issues or bugs in >> my code, I wanted to be sure if something like this is possible under beam >> constructs. >> >> Please let me know your thoughts. >> >> Thanks >> Sachin >> >>