Re: FlinkCEP questions - architecture

2020-02-21 Thread Juergen Donnerstag
t
> sure if you can inspect it at runtime.
>
> 8) I understand state is stored per node. What happens if I want to
> add or remove a nodes. Will the state still be found, despite it being
> stored in another node? I read that I need to be equally careful when
> changing rules? Or is that a different issue?
>
> -> Rescaling a Flink job is not done automatically. You need to take a
> savepoint and then relaunch your job with a different parallelism.
> Updating a rule is not supported in CEP, as changing a rule would
> imply that (potentially) the state should change. But what you could
> do is take a savepoint, remove the old pattern and add a new one (the
> updated one) and tell Flink to ignore the state of the previous
> operator (as said earlier each CEP pattern is translated to an
> operator).
>
> 9) How does garbage collection of temp CEP state work, or will it stay
> forever?  For tracing/investigation reasons I can imagine that purging
> it at the earliest possible time is not always the best option. May be
> after 30 days later or so.
>
> -> CEP clean state after the time horizon (specified with the
> .within() clause) expires.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries
> you  filter first on the "smallest" attributes. CEP rules form a
> sequence. Hence that approach will not work. Is that an issue at all?
> What are practical limits on the CEP temp state storage engine?
>
> -> Such optimizations are not supported out of the box. I would
> recommend to go with the Broadcast state approach in [3].
>
> 11) Occassionally we need to process about 200 files at once. Can I
> speed things up by processing all files in parallel on multiple nodes,
> despite their sequence (CEP use case)? This would only work if
> FlinkCEP in step 1 simply filters on all relevant events of a
> sequence, updates state, and in a step 2 - after the files are
> processed - evaluates the updated state if that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source
> system schema is changed, and not always in a backwards compatible way
> (insert new fields in the middle), and also the export will have the
> field in the middle. This means that starting from a specific (file)
> date, I need to consider a different schema. This must also be handled
> when re-running files for the last month, because of corrections
> provided. And if the file format has changed someone in the middle ...
>
> -> This seems to be relevant for the "data cleaning" phase, before you
> send your data to CEP. In this case, if the schema changes, then I
> assume that you need to update your initial parsing logic, which means
> taking a savepoint and redeploying the updated jobGraph with the new
> input parsing logic (if I understand correctly).
>
> thanks a lot for your time and your help
>
> I hope that above helps!
>
> Cheers,
> Kostas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
> [2]
> https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
> [3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
>
> On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
>  wrote:
> >
> > Hi,
> >
> > we're in very early stages evaluating options. I'm not a Flink expert,
> but did read some of the docs and watched videos. Could you please help me
> understand if and how certain of our reqs are covered by Flink (CEP). Is
> this mailing list the right channel for such questions?
> >
> > 1) We receive files every day, which are exports from some database
> tables, containing ONLY changes from the day. Most tables have modify-cols.
> Even though they are files but because they contain changes only, I belief
> the file records shall be considered events in Flink terminology. Is that
> assumption correct?
> >
> > 2) The records within the DB export files are NOT in chronologically,
> and we can not change the export. Our use case is a "complex event
> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A,
> then B, then C within 30 days, then do something". Does that work with
> FlinkCEP despite the events/records are not in chrono order within the
> file? The files are 100MB to 20GB in size. Do I need to sort the files
> first before CEP processing?
> >
> > 3) Occassionally some crazy people manually "correct" DB records within
> the database and manually trigger a re-export of ALL of the changes for
> that respective day (e.g. last weeks Tuesday). Consequently we receive a
> correction file. S

FlinkCEP questions - architecture

2020-02-10 Thread Juergen Donnerstag
Hi,

we're in very early stages evaluating options. I'm not a Flink expert, but
did read some of the docs and watched videos. Could you please help me
understand if and how certain of our reqs are covered by Flink (CEP). Is
this mailing list the right channel for such questions?

1) We receive files every day, which are exports from some database tables,
containing ONLY changes from the day. Most tables have modify-cols. Even
though they are files but because they contain changes only, I belief the
file records shall be considered events in Flink terminology. Is that
assumption correct?

2) The records within the DB export files are NOT in chronologically, and
we can not change the export. Our use case is a "complex event processing"
case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C
within 30 days, then do something". Does that work with FlinkCEP despite
the events/records are not in chrono order within the file? The files are
100MB to 20GB in size. Do I need to sort the files first before CEP
processing?

3) Occassionally some crazy people manually "correct" DB records within the
database and manually trigger a re-export of ALL of the changes for that
respective day (e.g. last weeks Tuesday). Consequently we receive a
correction file. Same filename but "_1" appended. All filenames include the
date (of the original export). What are the options to handle that case
(besides telling the DB admins not to, which we did already). Regular
checkpoints and re-process all files since then?  What happens to the CEP
state? Will it be checkpointed as well?

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

5) We also have CEP rules that must fire if after a start sequence matched,
the remaining sequence did NOT within a configured window. E.g. If A, then
B, but C did not occur within 30 days since A. Is that supported by
FlinkCEP? I couldn't find a working example.

6) We expect 30-40 CEP rules. How can we estimate the required storage size
for the temporary CEP state? Is there some sort of formular considering
number of rules, number of records per file or day, record size, window,
number of records matched per sequence, number of keyBy grouping keys, ...

7) I can imagine that for debugging reasons it'd be good if we were able to
query the temporary CEP state. What is the (CEP) schema used to persist the
CEP state and how can we query it? And does such query work on the whole
cluster or only per node (e.g. because of shuffle and nodes responsible
only for a portion of the events).

8) I understand state is stored per node. What happens if I want to add or
remove a nodes. Will the state still be found, despite it being stored in
another node? I read that I need to be equally careful when changing rules?
Or is that a different issue?

9) How does garbage collection of temp CEP state work, or will it stay
forever?  For tracing/investigation reasons I can imagine that purging it
at the earliest possible time is not always the best option. May be after
30 days later or so.

10) Are there strategies to minimize temp CEP state? In SQL queries you
 filter first on the "smallest" attributes. CEP rules form a sequence.
Hence that approach will not work. Is that an issue at all? What are
practical limits on the CEP temp state storage engine?

11) Occassionally we need to process about 200 files at once. Can I speed
things up by processing all files in parallel on multiple nodes, despite
their sequence (CEP use case)? This would only work if FlinkCEP in step 1
simply filters on all relevant events of a sequence, updates state, and in
a step 2 - after the files are processed - evaluates the updated state if
that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source system
schema is changed, and not always in a backwards compatible way (insert new
fields in the middle), and also the export will have the field in the
middle. This means that starting from a specific (file) date, I need to
consider a different schema. This must also be handled when re-running
files for the last month, because of corrections provided. And if the file
format has changed someone in the middle ...

thanks a lot for your time and your help
Juergen