Guidance on general design approach for Flink

2024-01-30 Thread Mark Petronic
I am working on a new application to perform real time anomaly detection
using Flink. I am relatively new to Flink but have already one application
in production that is fairly basic and of low throughput. This next one
will be more complex and much higher throughput.

My query is about handling late arriving data. For this application, the
source data is like this:

   - zip files containing a single JSON file each are pushed to an S3
   bucket by many servers that I need to monitor where N servers are in M pools
   - An SQS event then is published from S3 for each new zip file that
   arrives
   - I write a Flink source to read the event and pull the S3 file as it
   arrives, stream unzip, deserialize the JSON, flat map its contents into a
   datastream, and then process the data in tumbling 60 second windows

Each file can contain up to either 300 seconds worth of metrics or 1000
time series records. When a server is processing a lot of connections, the
files grow faster so the JSON file is closed as soon at the1000
sample threshold hits. When a server's traffic is low, it emits the file
when the 300 second elapsed time threshold hits, regardless of how many
samples are in the file (so the file will have between 0 <= samples <=
1000) in this case.

It is this pattern that I am struggling with. I need to use one-minute
tumbling windows to aggregate these metrics. However, I may have to wait
300 seconds for the slow traffic file to be uploaded to S3 while other
files (on higher traffic servers) are showing up in S3 maybe every 10 or 20
seconds (each filled with 1000 samples that trigger the file closure and
update). Any of these files can contain a portion of the data that
would align into the same 60 second time window. All of the data from all
these servers needs to be aggregated and grouped as the servers are part of
pools of servers so I need to group by the records from each POOL and not
by just the records from each server.

So, my questions given that context are:

   1. Is having a 60 second tumbling window with 300 seconds of allowed
   lateness a pattern that is fairly common and typically implemented in Flink
   - where the lateness is not a fraction of the window size but a multiple of
   it? My sense is that this IS a reasonable problem that Flink can deal with.
   2. I believe with such an approach, there will potentially be 5-6
   60-second windows in flight for each grouping key to accommodate such
   allowed lateness and that means a lot more resources/state for the cluster
   to support. This impacts the resources needed for a given cluster scale. Do
   I have that assumption correct?

I just want to make sure I am considering the right tool for this job and
appreciate any inputs on this.


Re: Redis as a State Backend

2024-01-30 Thread Zakelly Lan
And I found some previous discussion, FYI:
1. https://issues.apache.org/jira/browse/FLINK-3035
2. https://www.mail-archive.com/dev@flink.apache.org/msg10666.html

Hope this helps.

Best,
Zakelly

On Tue, Jan 30, 2024 at 4:08 PM Zakelly Lan  wrote:

> Hi Chirag
>
> That's an interesting idea. IIUC, storing key-values can be simply
> implemented for Redis, but supporting checkpoint and recovery is relatively
> challenging. Flink's checkpoint should be consistent among all stateful
> operators at the same time. For an *embedded* and *file-based* key value
> store like RocksDB, it is easier to implement by uploading files of
> specific time asynchronously.
>
> Moreover if you want to store your state basically in memory, then why not
> using the HashMapStateBackend. It saves the overhead of serialization and
> deserialization and may achieve better performance compared with Redis I
> guess.
>
>
> Best,
> Zakelly
>
> On Tue, Jan 30, 2024 at 2:15 PM Chirag Dewan via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>> I was looking at the FLIP-254: Redis Streams Connector and I was
>> wondering if Flink ever considered Redis as a state backend? And if yes,
>> why was it discarded compared to RocksDB?
>>
>> If someone can point me towards any deep dives on why RocksDB is a better
>> fit as a state backend, it would be helpful.
>>
>> Thanks,
>> Chirag
>>
>


Re: Redis as a State Backend

2024-01-30 Thread Zakelly Lan
Hi Chirag

That's an interesting idea. IIUC, storing key-values can be simply
implemented for Redis, but supporting checkpoint and recovery is relatively
challenging. Flink's checkpoint should be consistent among all stateful
operators at the same time. For an *embedded* and *file-based* key value
store like RocksDB, it is easier to implement by uploading files of
specific time asynchronously.

Moreover if you want to store your state basically in memory, then why not
using the HashMapStateBackend. It saves the overhead of serialization and
deserialization and may achieve better performance compared with Redis I
guess.


Best,
Zakelly

On Tue, Jan 30, 2024 at 2:15 PM Chirag Dewan via user 
wrote:

> Hi,
>
> I was looking at the FLIP-254: Redis Streams Connector and I was
> wondering if Flink ever considered Redis as a state backend? And if yes,
> why was it discarded compared to RocksDB?
>
> If someone can point me towards any deep dives on why RocksDB is a better
> fit as a state backend, it would be helpful.
>
> Thanks,
> Chirag
>