Only direct runner.

I have right now disabled aggregation on kpl and it looks like to be
working.

On Sat, 13 May 2023 at 3:35 AM, Pavel Solomin <p.o.solo...@gmail.com> wrote:

> > 100,000's of data records are accumulated and they are tried to be
> pushed to Kinesis all at once
>
> Does that happen only in direct runner? Or Flink runner behaves similarly?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 12 May 2023 at 16:43, Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> Hi,
>> So I have prepared the write pipeline something like this:
>>
>>
>> --------------------------------------------------------------------------------------------------------------
>> writePipeline
>> .apply(GenerateSequence.from(0).to(100))
>> .apply(ParDo.of(new DoFn<Long, byte[]>() {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> long i = c.element();
>> // Fetching data for step=i
>> List<> data = fetchForInputStep(i);
>> // output all the data one by one
>> for (Data d : data) {
>> out.output(d.asBytes());
>> }
>> }
>> }))
>> .apply(KinesisIO.write()
>> .withStreamName(streamName)
>> // other configs
>> );
>>
>> writePipeline.run().waitUntilFinish()
>>
>> What I observe is that pipeline part to push data to kinesis is only
>> happening after the entire data is loaded by a second apply function.
>> So what happens is that 100,000's of data records are accumulated and
>> they are tried to be pushed to Kinesis all at once and we get following
>> error:
>> *KPL Expiration reached while waiting in limiter*
>>
>> The logs are generated like this:
>>
>> --------------------------------------------------------------------------------------------------------------
>> Extracting binaries to
>> /var/folders/30/knyj9z4d3psbd4s6kffqc5000000gn/T/amazon-kinesis-producer-native-binaries
>> .........
>> [main.cc:384] Starting up main producer
>> .........
>> [main.cc:395] Entering join
>> .........
>> Fetching data for step=1
>> .........
>> Fetching data for step=100
>> .........
>> [kinesis_producer.cc:200] Created pipeline for stream "xxxxxx"
>> [shard_map.cc:87] Updating shard map for stream "xxxxxx"
>> [shard_map.cc:148] Successfully updated shard map for stream "xxxxxx"
>> found 1 shards
>> [processing_statistics_logger.cc:111] Stage 1 Triggers: { stream:
>> 'xxxxxx', manual: 10, count: 0, size: 4688, matches: 0, timed: 0,
>> UserRecords: 742018, KinesisRecords: 4698 }
>>
>>
>> I had assumed that as soon as step 1 data was fetched it would pass the
>> data downstream and
>> the kinesis pipeline would have been created much before and would have
>> started writing to Kinesis much earlier, but this is happening only after
>> all the data is collected.
>>
>> Is there a way to fix this ?
>>
>> Thanks
>> Sachin
>>
>>
>>
>> On Wed, May 10, 2023 at 4:29 PM Pavel Solomin <p.o.solo...@gmail.com>
>> wrote:
>>
>>> > two pipeline objects in my application
>>>
>>> I think this should work. I meant to have 2 separate artifacts and
>>> deploy them separately, but if your app runs batch processing with 2
>>> sequential steps, 2 pipelines should work too:
>>>
>>> - writePipeline.run().waitUntilFinish()
>>> - readAndWritePipeline.run().waitUntilFinish()
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 10 May 2023 at 11:49, Sachin Mittal <sjmit...@gmail.com> wrote:
>>>
>>>> Use case is something like this:
>>>> A source writes source data to kinesis and same is used to compute
>>>> derived data which is again written back to same stream so next level of
>>>> derived data can be computed from previous derived data and so on.
>>>>
>>>> Would there be any issues from beam side to do the same within a single
>>>> pipeline?
>>>>
>>>> When you say I have to split my app into two do you mean that I have to
>>>> create two pipeline objects in my application?
>>>>
>>>> If so then how will application end?
>>>>
>>>> Note that source is of finite size which gets written into kinesis.
>>>>
>>>> Also we do plan to migrate to aws2 io, but later. If aws1 has some
>>>> limitations in achieving what we want then please let me know.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p.o.solo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>> I've never seen use-cases where it would be necessary. What are you
>>>>> trying to achieve? Some context would be helpful.
>>>>> Your example looks like you can split your app into two - one writes
>>>>> into streamName and the others read from streamName.
>>>>>
>>>>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
>>>>> is not maintained anymore. Better to use this instead:
>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>>>>
>>>>> Best Regards,
>>>>> Pavel Solomin
>>>>>
>>>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>>>> <https://www.linkedin.com/in/pavelsolomin>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sjmit...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>>>>>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>>>>>
>>>>>>
>>>>>> My pipeline is something like this: (*note the kinesis stream used
>>>>>> to write to and then again read from is empty before starting the app*
>>>>>> )
>>>>>>
>>>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>
>>>>>> PCollection<> input = pipeline.apply(/* read from some source */);
>>>>>>
>>>>>> // populate an empty kinesis stream
>>>>>> input
>>>>>> .apply(
>>>>>> KinesisIO.write()
>>>>>> .withStreamName(streamName)
>>>>>> // other IO configs ....
>>>>>> );
>>>>>>
>>>>>> // within same application start another pipeline
>>>>>> // to read from some kinesis stream from start
>>>>>> PCollection<> output = pipeline
>>>>>> .apply(
>>>>>> KinesisIO.read()
>>>>>> .withStreamName(streamName)
>>>>>> .withMaxReadTime(duration) // wait for some duration before deciding
>>>>>> to close the pipeline
>>>>>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>>>>>> from start
>>>>>> // other IO configs
>>>>>> )
>>>>>> .apply(/* apply other transformations */);
>>>>>>
>>>>>>
>>>>>> // write transformed output to same kinesis stream
>>>>>> output
>>>>>> .apply(
>>>>>> KinesisIO.write()
>>>>>> .withStreamName(streamName)
>>>>>> // other IO configs
>>>>>> );
>>>>>>
>>>>>> // also write transformed output to some other kinesis stream
>>>>>> output
>>>>>> .apply(
>>>>>> KinesisIO.write()
>>>>>> .withStreamName(otherStreamName) // a different kinesis stream
>>>>>> // other IO configs
>>>>>> );
>>>>>>
>>>>>>
>>>>>> pipeline.run().waitUntilFinish();
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>>>
>>>>>> Will something like this work in a single beam application ?
>>>>>> Is there a better way of designing this ?
>>>>>>
>>>>>> I am right now trying to run this using a direct runner but I am
>>>>>> facing some issues in reading from the same kinesis stream again.
>>>>>> It is actually able to read the records but somehow read records are
>>>>>> not pushed downstream for further processing.
>>>>>>
>>>>>> Before debugging it further and looking into any logic issues or bugs
>>>>>> in my code, I wanted to be sure if something like this is possible under
>>>>>> beam constructs.
>>>>>>
>>>>>> Please let me know your thoughts.
>>>>>>
>>>>>> Thanks
>>>>>> Sachin
>>>>>>
>>>>>>

Reply via email to