Re: Suggestions for aggregating records to a Kinesis Sink, (or generic Async Sink)?

2024-04-29 Thread Michael Marino
Hi Ahmed, hi Hong,

Thanks for your responses.

It sounds like the most promising would be to initially focus on the Global
Window with the custom trigger.

We don't need to be compatible with the aggregation used by the KPL
(actually we would likely combine records in protobuf, and my impression is
KPL is rather only supporting combining records like line-delimited json).
Regarding introducing a stateful operator, this is perhaps simply something
we have to accept, though it would be great if we could guarantee a flush
on snapshot generation.

Cheers,
Mike

On Mon, Apr 29, 2024 at 1:12 PM Hong Liang  wrote:

> Hi Michael, thanks for the question!
>
> Maybe you can consider using a global window with custom trigger
> (CountTrigger + ProcessingTimeTrigger/EventTimeTrigger)? CountTrigger
> should allow you to configure window closure after X elements.
> ProcessingTimeTrigger/EventTimeTrigger should allow you to flush the window
> even if the count is not hit after a specified amount of time.
>
> A possible issue with the above implementation is that some elements might
> be stored in snapshot state. I'm not sure of an easy way to "flush" items
> out of a custom window on snapshot action.
>
> Hope the above helps.
>
> Regards,
> Hong
>
> On Mon, Apr 29, 2024 at 11:15 AM Michael Marino 
> wrote:
>
>> Hi all,
>>
>> We are currently using Flink 1.18.1 (AWS Managed Flink) and are writing
>> to Kinesis streams in several of our applications using the Table API.
>>
>> In our use case, we would like to be able to aggregate multiple records
>> (rows) together and emit them in a single Kinesis record.
>>
>> As far as I understand, with the usage of the Async Writer it is not
>> possible to aggregate records (Table rows) together into a single record as
>> was possible previously with the Kinesis Producer Library.
>>
>> I wanted to ask if anyone here has any suggestions of how to do this, or
>> perhaps if I missed it somewhere in the documentation? I was thinking about
>> moving the logic to use window functions (either in the Table or DataStream
>> API), but here I think I'd need to "close" the window based not only on
>> time, but also on record number. Anyways, any thoughts are appreciated!
>>
>> Thanks,
>> Mike
>>
>> --
>>
>> Michael Marino
>>
>> Principal Data Science & Analytics
>>
>> Phone:  +49 89 7167786 - 14
>>
>> linkedin.com/company/tadogmbh 
>>  | facebook.com/tado  | twitter.com/tado
>>  | youtube.com/tado
>> 
>>
>> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>>
>>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
>> Schwarz | Josef Wenzl
>>
>> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
>> 280012558
>>
>

-- 

Michael Marino

Principal Data Science & Analytics

Phone:  +49 89 7167786 - 14

linkedin.com/company/tadogmbh  |
facebook.com/tado  | twitter.com/tado
 | youtube.com/tado


www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany

 Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
Schwarz | Josef Wenzl

Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
280012558


Re: Suggestions for aggregating records to a Kinesis Sink, (or generic Async Sink)?

2024-04-29 Thread Ahmed Hamdy
Hi Michael,
Unfortunately the new `KinesisDataStreamsSink` doesn't support aggregation
yet.
My suggestion if you want to use native kinesis aggregation is to use the
latest connector version that supports KPL as sink for Table API, that
would be 1.14.x. you could package the connector of that version.

 > was thinking about moving the logic to use window functions (either in
the Table or DataStream API), but here I think I'd need to "close" the
window based not only on time, but also on record number

Regarding this approach, I believe a better way might be implementing a
custom process function to hold batches of records in state and emit an
aggregated record but this might not be consistent with KPL aggregation of
course and de-aggregated records could be not retrieved so I would advise
not to take this approach.


Best Regards
Ahmed Hamdy


On Mon, 29 Apr 2024 at 11:14, Michael Marino 
wrote:

> Hi all,
>
> We are currently using Flink 1.18.1 (AWS Managed Flink) and are writing to
> Kinesis streams in several of our applications using the Table API.
>
> In our use case, we would like to be able to aggregate multiple records
> (rows) together and emit them in a single Kinesis record.
>
> As far as I understand, with the usage of the Async Writer it is not
> possible to aggregate records (Table rows) together into a single record as
> was possible previously with the Kinesis Producer Library.
>
> I wanted to ask if anyone here has any suggestions of how to do this, or
> perhaps if I missed it somewhere in the documentation? I was thinking about
> moving the logic to use window functions (either in the Table or DataStream
> API), but here I think I'd need to "close" the window based not only on
> time, but also on record number. Anyways, any thoughts are appreciated!
>
> Thanks,
> Mike
>
> --
>
> Michael Marino
>
> Principal Data Science & Analytics
>
> Phone:  +49 89 7167786 - 14
>
> linkedin.com/company/tadogmbh 
> | facebook.com/tado  | twitter.com/tado
>  | youtube.com/tado
> 
>
> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>
>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
> Schwarz | Josef Wenzl
>
> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
> 280012558
>


Suggestions for aggregating records to a Kinesis Sink, (or generic Async Sink)?

2024-04-29 Thread Michael Marino
Hi all,

We are currently using Flink 1.18.1 (AWS Managed Flink) and are writing to
Kinesis streams in several of our applications using the Table API.

In our use case, we would like to be able to aggregate multiple records
(rows) together and emit them in a single Kinesis record.

As far as I understand, with the usage of the Async Writer it is not
possible to aggregate records (Table rows) together into a single record as
was possible previously with the Kinesis Producer Library.

I wanted to ask if anyone here has any suggestions of how to do this, or
perhaps if I missed it somewhere in the documentation? I was thinking about
moving the logic to use window functions (either in the Table or DataStream
API), but here I think I'd need to "close" the window based not only on
time, but also on record number. Anyways, any thoughts are appreciated!

Thanks,
Mike

-- 

Michael Marino

Principal Data Science & Analytics

Phone:  +49 89 7167786 - 14

linkedin.com/company/tadogmbh  |
facebook.com/tado  | twitter.com/tado
 | youtube.com/tado


www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany

 Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
Schwarz | Josef Wenzl

Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
280012558