Only direct runner. I have right now disabled aggregation on kpl and it looks like to be working.
On Sat, 13 May 2023 at 3:35 AM, Pavel Solomin <p.o.solo...@gmail.com> wrote: > > 100,000's of data records are accumulated and they are tried to be > pushed to Kinesis all at once > > Does that happen only in direct runner? Or Flink runner behaves similarly? > > Best Regards, > Pavel Solomin > > Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > <https://www.linkedin.com/in/pavelsolomin> > > > > > > On Fri, 12 May 2023 at 16:43, Sachin Mittal <sjmit...@gmail.com> wrote: > >> Hi, >> So I have prepared the write pipeline something like this: >> >> >> -------------------------------------------------------------------------------------------------------------- >> writePipeline >> .apply(GenerateSequence.from(0).to(100)) >> .apply(ParDo.of(new DoFn<Long, byte[]>() { >> @ProcessElement >> public void processElement(ProcessContext c) { >> long i = c.element(); >> // Fetching data for step=i >> List<> data = fetchForInputStep(i); >> // output all the data one by one >> for (Data d : data) { >> out.output(d.asBytes()); >> } >> } >> })) >> .apply(KinesisIO.write() >> .withStreamName(streamName) >> // other configs >> ); >> >> writePipeline.run().waitUntilFinish() >> >> What I observe is that pipeline part to push data to kinesis is only >> happening after the entire data is loaded by a second apply function. >> So what happens is that 100,000's of data records are accumulated and >> they are tried to be pushed to Kinesis all at once and we get following >> error: >> *KPL Expiration reached while waiting in limiter* >> >> The logs are generated like this: >> >> -------------------------------------------------------------------------------------------------------------- >> Extracting binaries to >> /var/folders/30/knyj9z4d3psbd4s6kffqc5000000gn/T/amazon-kinesis-producer-native-binaries >> ......... >> [main.cc:384] Starting up main producer >> ......... >> [main.cc:395] Entering join >> ......... >> Fetching data for step=1 >> ......... >> Fetching data for step=100 >> ......... >> [kinesis_producer.cc:200] Created pipeline for stream "xxxxxx" >> [shard_map.cc:87] Updating shard map for stream "xxxxxx" >> [shard_map.cc:148] Successfully updated shard map for stream "xxxxxx" >> found 1 shards >> [processing_statistics_logger.cc:111] Stage 1 Triggers: { stream: >> 'xxxxxx', manual: 10, count: 0, size: 4688, matches: 0, timed: 0, >> UserRecords: 742018, KinesisRecords: 4698 } >> >> >> I had assumed that as soon as step 1 data was fetched it would pass the >> data downstream and >> the kinesis pipeline would have been created much before and would have >> started writing to Kinesis much earlier, but this is happening only after >> all the data is collected. >> >> Is there a way to fix this ? >> >> Thanks >> Sachin >> >> >> >> On Wed, May 10, 2023 at 4:29 PM Pavel Solomin <p.o.solo...@gmail.com> >> wrote: >> >>> > two pipeline objects in my application >>> >>> I think this should work. I meant to have 2 separate artifacts and >>> deploy them separately, but if your app runs batch processing with 2 >>> sequential steps, 2 pipelines should work too: >>> >>> - writePipeline.run().waitUntilFinish() >>> - readAndWritePipeline.run().waitUntilFinish() >>> >>> Best Regards, >>> Pavel Solomin >>> >>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >>> <https://www.linkedin.com/in/pavelsolomin> >>> >>> >>> >>> >>> >>> On Wed, 10 May 2023 at 11:49, Sachin Mittal <sjmit...@gmail.com> wrote: >>> >>>> Use case is something like this: >>>> A source writes source data to kinesis and same is used to compute >>>> derived data which is again written back to same stream so next level of >>>> derived data can be computed from previous derived data and so on. >>>> >>>> Would there be any issues from beam side to do the same within a single >>>> pipeline? >>>> >>>> When you say I have to split my app into two do you mean that I have to >>>> create two pipeline objects in my application? >>>> >>>> If so then how will application end? >>>> >>>> Note that source is of finite size which gets written into kinesis. >>>> >>>> Also we do plan to migrate to aws2 io, but later. If aws1 has some >>>> limitations in achieving what we want then please let me know. >>>> >>>> Thanks >>>> >>>> >>>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p.o.solo...@gmail.com> >>>> wrote: >>>> >>>>> Hello! >>>>> >>>>> I've never seen use-cases where it would be necessary. What are you >>>>> trying to achieve? Some context would be helpful. >>>>> Your example looks like you can split your app into two - one writes >>>>> into streamName and the others read from streamName. >>>>> >>>>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and >>>>> is not maintained anymore. Better to use this instead: >>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html >>>>> >>>>> Best Regards, >>>>> Pavel Solomin >>>>> >>>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >>>>> <https://www.linkedin.com/in/pavelsolomin> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sjmit...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> I am using aws beam sdk1 to read from and write to a kinesis stream. >>>>>> *org.apache.beam.sdk.io.kinesis.KinesisIO* >>>>>> >>>>>> >>>>>> My pipeline is something like this: (*note the kinesis stream used >>>>>> to write to and then again read from is empty before starting the app* >>>>>> ) >>>>>> >>>>>> --------------------------------------------------------------------------------------------------------------------------------------- >>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>> >>>>>> PCollection<> input = pipeline.apply(/* read from some source */); >>>>>> >>>>>> // populate an empty kinesis stream >>>>>> input >>>>>> .apply( >>>>>> KinesisIO.write() >>>>>> .withStreamName(streamName) >>>>>> // other IO configs .... >>>>>> ); >>>>>> >>>>>> // within same application start another pipeline >>>>>> // to read from some kinesis stream from start >>>>>> PCollection<> output = pipeline >>>>>> .apply( >>>>>> KinesisIO.read() >>>>>> .withStreamName(streamName) >>>>>> .withMaxReadTime(duration) // wait for some duration before deciding >>>>>> to close the pipeline >>>>>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) // >>>>>> from start >>>>>> // other IO configs >>>>>> ) >>>>>> .apply(/* apply other transformations */); >>>>>> >>>>>> >>>>>> // write transformed output to same kinesis stream >>>>>> output >>>>>> .apply( >>>>>> KinesisIO.write() >>>>>> .withStreamName(streamName) >>>>>> // other IO configs >>>>>> ); >>>>>> >>>>>> // also write transformed output to some other kinesis stream >>>>>> output >>>>>> .apply( >>>>>> KinesisIO.write() >>>>>> .withStreamName(otherStreamName) // a different kinesis stream >>>>>> // other IO configs >>>>>> ); >>>>>> >>>>>> >>>>>> pipeline.run().waitUntilFinish(); >>>>>> >>>>>> >>>>>> --------------------------------------------------------------------------------------------------------------------------------------- >>>>>> >>>>>> Will something like this work in a single beam application ? >>>>>> Is there a better way of designing this ? >>>>>> >>>>>> I am right now trying to run this using a direct runner but I am >>>>>> facing some issues in reading from the same kinesis stream again. >>>>>> It is actually able to read the records but somehow read records are >>>>>> not pushed downstream for further processing. >>>>>> >>>>>> Before debugging it further and looking into any logic issues or bugs >>>>>> in my code, I wanted to be sure if something like this is possible under >>>>>> beam constructs. >>>>>> >>>>>> Please let me know your thoughts. >>>>>> >>>>>> Thanks >>>>>> Sachin >>>>>> >>>>>>