Amazing content, thanks for asking and answering. On Fri, Feb 21, 2020 at 5:04 AM Juergen Donnerstag < juergen.donners...@gmail.com> wrote:
> thanks a lot > Juergen > > On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas <kklou...@gmail.com> > wrote: > >> Hi Juergen, >> >> I will reply to your questions inline. As a general comment I would >> suggest to also have a look at [3] so that you have an idea of some of >> the alternatives. >> With that said, here come the answers :) >> >> 1) We receive files every day, which are exports from some database >> tables, containing ONLY changes from the day. Most tables have >> modify-cols. Even though they are files but because they contain >> changes only, I belief the file records shall be considered events in >> Flink terminology. Is that assumption correct? >> >> -> Yes. I think your assumption is correct. >> >> 2) The records within the DB export files are NOT in chronologically, >> and we can not change the export. Our use case is a "complex event >> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first >> A, then B, then C within 30 days, then do something". Does that work >> with FlinkCEP despite the events/records are not in chrono order >> within the file? The files are 100MB to 20GB in size. Do I need to >> sort the files first before CEP processing? >> >> -> Flink CEP also works in event time and the re-ordering can be done by >> Flink >> >> 3) Occassionally some crazy people manually "correct" DB records >> within the database and manually trigger a re-export of ALL of the >> changes for that respective day (e.g. last weeks Tuesday). >> Consequently we receive a correction file. Same filename but "_1" >> appended. All filenames include the date (of the original export). >> What are the options to handle that case (besides telling the DB >> admins not to, which we did already). Regular checkpoints and >> re-process all files since then? What happens to the CEP state? Will >> it be checkpointed as well? >> >> -> If you require re-processing, then I would say that your best >> option is what you described. The other option would be to keep >> everything in Flink state until you are sure that no more corrections >> will come. In this case, you have to somehow issue the "correction" in >> a way that the downstream system can understand what to correct and >> how. Keep in mind that this may be an expensive operation because >> everything has to be kept in state for longer. >> >> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? >> >> -> The only thing to consider is the size of your state. Time is not >> necessarily an issue. If your state for these 180 days is a couple of >> MBs, then you have no problem. If it increases fast, then you have to >> provision your cluster accordingly. >> >> 5) We also have CEP rules that must fire if after a start sequence >> matched, the remaining sequence did NOT within a configured window. >> E.g. If A, then B, but C did not occur within 30 days since A. Is that >> supported by FlinkCEP? I couldn't find a working example. >> >> -> You can have a look at [1] for the supported pattern combinations >> and you can also look at [2] for some tests of different pattern >> combinations. >> >> 6) We expect 30-40 CEP rules. How can we estimate the required storage >> size for the temporary CEP state? Is there some sort of formular >> considering number of rules, number of records per file or day, record >> size, window, number of records matched per sequence, number of keyBy >> grouping keys, ... >> >> -> In FlinkCEP, each pattern becomes a single operator. This means >> that you will have 30-40 operators in your job graph, each with each >> own state. This can become heavy but once again it depends on your >> workload. I cannot give an estimate because in CEP, in order to >> guarantee correct ordering of events in an unordered stream, the >> library sometimes has to keep also in state more records than will be >> presented at the end. >> >> Have you considered going with a solution based on processfunction and >> broadcast state? This will also allow you to have a more dynamic >> set-up where patterns can be added at runtime and it will allow you to >> do any optimizations specific to your workload ;) For a discussion on >> this, check [3]. In addition, it will allow you to "multiplex" many >> patterns into a single operator thus potentially minimizing the amount >> of copies of the state you keep. >> >> 7) I can imagine that for debugging reasons it'd be good if we were >> able to query the temporary CEP state. What is the (CEP) schema used >> to persist the CEP state and how can we query it? And does such query >> work on the whole cluster or only per node (e.g. because of shuffle >> and nodes responsible only for a portion of the events). >> >> -> Unfortunatelly the state in CEP is not queryable, thus I am not >> sure if you can inspect it at runtime. >> >> 8) I understand state is stored per node. What happens if I want to >> add or remove a nodes. Will the state still be found, despite it being >> stored in another node? I read that I need to be equally careful when >> changing rules? Or is that a different issue? >> >> -> Rescaling a Flink job is not done automatically. You need to take a >> savepoint and then relaunch your job with a different parallelism. >> Updating a rule is not supported in CEP, as changing a rule would >> imply that (potentially) the state should change. But what you could >> do is take a savepoint, remove the old pattern and add a new one (the >> updated one) and tell Flink to ignore the state of the previous >> operator (as said earlier each CEP pattern is translated to an >> operator). >> >> 9) How does garbage collection of temp CEP state work, or will it stay >> forever? For tracing/investigation reasons I can imagine that purging >> it at the earliest possible time is not always the best option. May be >> after 30 days later or so. >> >> -> CEP clean state after the time horizon (specified with the >> .within() clause) expires. >> >> 10) Are there strategies to minimize temp CEP state? In SQL queries >> you filter first on the "smallest" attributes. CEP rules form a >> sequence. Hence that approach will not work. Is that an issue at all? >> What are practical limits on the CEP temp state storage engine? >> >> -> Such optimizations are not supported out of the box. I would >> recommend to go with the Broadcast state approach in [3]. >> >> 11) Occassionally we need to process about 200 files at once. Can I >> speed things up by processing all files in parallel on multiple nodes, >> despite their sequence (CEP use case)? This would only work if >> FlinkCEP in step 1 simply filters on all relevant events of a >> sequence, updates state, and in a step 2 - after the files are >> processed - evaluates the updated state if that meets the sequences. >> >> 12) Schema changes in the input files: Occassionly the DB source >> system schema is changed, and not always in a backwards compatible way >> (insert new fields in the middle), and also the export will have the >> field in the middle. This means that starting from a specific (file) >> date, I need to consider a different schema. This must also be handled >> when re-running files for the last month, because of corrections >> provided. And if the file format has changed someone in the middle ... >> >> -> This seems to be relevant for the "data cleaning" phase, before you >> send your data to CEP. In this case, if the schema changes, then I >> assume that you need to update your initial parsing logic, which means >> taking a savepoint and redeploying the updated jobGraph with the new >> input parsing logic (if I understand correctly). >> >> thanks a lot for your time and your help >> >> I hope that above helps! >> >> Cheers, >> Kostas >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns >> [2] >> https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java >> [3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html >> >> On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag >> <juergen.donners...@gmail.com> wrote: >> > >> > Hi, >> > >> > we're in very early stages evaluating options. I'm not a Flink expert, >> but did read some of the docs and watched videos. Could you please help me >> understand if and how certain of our reqs are covered by Flink (CEP). Is >> this mailing list the right channel for such questions? >> > >> > 1) We receive files every day, which are exports from some database >> tables, containing ONLY changes from the day. Most tables have modify-cols. >> Even though they are files but because they contain changes only, I belief >> the file records shall be considered events in Flink terminology. Is that >> assumption correct? >> > >> > 2) The records within the DB export files are NOT in chronologically, >> and we can not change the export. Our use case is a "complex event >> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, >> then B, then C within 30 days, then do something". Does that work with >> FlinkCEP despite the events/records are not in chrono order within the >> file? The files are 100MB to 20GB in size. Do I need to sort the files >> first before CEP processing? >> > >> > 3) Occassionally some crazy people manually "correct" DB records within >> the database and manually trigger a re-export of ALL of the changes for >> that respective day (e.g. last weeks Tuesday). Consequently we receive a >> correction file. Same filename but "_1" appended. All filenames include the >> date (of the original export). What are the options to handle that case >> (besides telling the DB admins not to, which we did already). Regular >> checkpoints and re-process all files since then? What happens to the CEP >> state? Will it be checkpointed as well? >> > >> > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? >> > >> > 5) We also have CEP rules that must fire if after a start sequence >> matched, the remaining sequence did NOT within a configured window. E.g. If >> A, then B, but C did not occur within 30 days since A. Is that supported by >> FlinkCEP? I couldn't find a working example. >> > >> > 6) We expect 30-40 CEP rules. How can we estimate the required storage >> size for the temporary CEP state? Is there some sort of formular >> considering number of rules, number of records per file or day, record >> size, window, number of records matched per sequence, number of keyBy >> grouping keys, ... >> > >> > 7) I can imagine that for debugging reasons it'd be good if we were >> able to query the temporary CEP state. What is the (CEP) schema used to >> persist the CEP state and how can we query it? And does such query work on >> the whole cluster or only per node (e.g. because of shuffle and nodes >> responsible only for a portion of the events). >> > >> > 8) I understand state is stored per node. What happens if I want to add >> or remove a nodes. Will the state still be found, despite it being stored >> in another node? I read that I need to be equally careful when changing >> rules? Or is that a different issue? >> > >> > 9) How does garbage collection of temp CEP state work, or will it stay >> forever? For tracing/investigation reasons I can imagine that purging it >> at the earliest possible time is not always the best option. May be after >> 30 days later or so. >> > >> > 10) Are there strategies to minimize temp CEP state? In SQL queries >> you filter first on the "smallest" attributes. CEP rules form a sequence. >> Hence that approach will not work. Is that an issue at all? What are >> practical limits on the CEP temp state storage engine? >> > >> > 11) Occassionally we need to process about 200 files at once. Can I >> speed things up by processing all files in parallel on multiple nodes, >> despite their sequence (CEP use case)? This would only work if FlinkCEP in >> step 1 simply filters on all relevant events of a sequence, updates state, >> and in a step 2 - after the files are processed - evaluates the updated >> state if that meets the sequences. >> > >> > 12) Schema changes in the input files: Occassionly the DB source system >> schema is changed, and not always in a backwards compatible way (insert new >> fields in the middle), and also the export will have the field in the >> middle. This means that starting from a specific (file) date, I need to >> consider a different schema. This must also be handled when re-running >> files for the last month, because of corrections provided. And if the file >> format has changed someone in the middle ... >> > >> > thanks a lot for your time and your help >> > Juergen >> > -- -- [image: MotaWord] Oytun Tez M O T A W O R D | CTO & Co-Founder oy...@motaword.com <https://www.motaword.com/blog>