Hi,
I am using aws beam sdk1 to read from and write to a kinesis stream.
*org.apache.beam.sdk.io.kinesis.KinesisIO*


My pipeline is something like this: (*note the kinesis stream used to write
to and then again read from is empty before starting the app*)
---------------------------------------------------------------------------------------------------------------------------------------
Pipeline pipeline = Pipeline.create(options);

PCollection<> input = pipeline.apply(/* read from some source */);

// populate an empty kinesis stream
input
.apply(
KinesisIO.write()
.withStreamName(streamName)
// other IO configs ....
);

// within same application start another pipeline
// to read from some kinesis stream from start
PCollection<> output = pipeline
.apply(
KinesisIO.read()
.withStreamName(streamName)
.withMaxReadTime(duration) // wait for some duration before deciding to
close the pipeline
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) // from
start
// other IO configs
)
.apply(/* apply other transformations */);


// write transformed output to same kinesis stream
output
.apply(
KinesisIO.write()
.withStreamName(streamName)
// other IO configs
);

// also write transformed output to some other kinesis stream
output
.apply(
KinesisIO.write()
.withStreamName(otherStreamName) // a different kinesis stream
// other IO configs
);


pipeline.run().waitUntilFinish();

---------------------------------------------------------------------------------------------------------------------------------------

Will something like this work in a single beam application ?
Is there a better way of designing this ?

I am right now trying to run this using a direct runner but I am facing
some issues in reading from the same kinesis stream again.
It is actually able to read the records but somehow read records are not
pushed downstream for further processing.

Before debugging it further and looking into any logic issues or bugs in my
code, I wanted to be sure if something like this is possible under beam
constructs.

Please let me know your thoughts.

Thanks
Sachin

Reply via email to