GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/1557
[FLINK-3216] [FLINK-3217] [cep] Initial version of CEP library
This PR is the first version of Flink's CEP library.
The key components are the `NFA` which uses the `SharedBuffer` to
efficiently maintain the state multiple non-deterministic runs. The `NFA`
implementation is strongly based on this
[paper](https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf).
In order to define the `NFA`, the library supports the pattern API. The
pattern API let's you easily construct complex patterns with type and filter
conditions. The specified pattern is then compiled into a `NFA` which is
responsible for detecting the patterns. See the online documentation for a full
specification of the supported operations (docs/libs/cep/index.md).
In order to run the `NFA`, the library adds two custom stream operators:
`CEPPatternOperator` and `KeyedCEPPatternOperator`. The former is used for
non-keyed input streams and the latter for keyed data streams. The selection of
the right operator is transparently done vie the `CEP.from(input, pattern)`
method.
`CEP.from(input.pattern)` returns a `PatternStream` which contains the
matched event sequences. The event sequences can be processed by specifying a
`PatternSelectFunction` or a `PatternFlatSelectFunction`. Both methods receive
the detected pattern as a `Map` where `T` is the type of the input
data stream. Each event is matched against a state of the pattern and the name
of the state is the key of the map.
An example of the API can be seen next:
```
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream input = ...
DataStream partitionedInput = input.keyBy(new KeySelector() {
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
});
Pattern pattern = Pattern.begin("start")
.next("middle").where(new FilterFunction() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("name");
}
}).followedBy("end").where(new FilterFunction() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
PatternStream patternStream = CEP.from(partitionedInput, pattern);
DataStream alerts = patternStream.select(new
PatternSelectFunction() {
@Override
public Alert select(Map pattern) throws Exception {
return new Alert(pattern.get("start"), pattern.get("end"))
}
});
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink cep
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1557.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1557
commit 5deda066c5f4851a426b15a05994be0796f5e6f3
Author: Till Rohrmann
Date: 2016-01-14T09:04:23Z
[FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
Implements NFA using the SharedBuffer
Implements NFACompiler to compile a Pattern into a NFA
Add CEP operator
Makes NFA and SharedBuffer serializable
Add serializability support to SharedBuffer and NFA
Add keyed cep pattern operator
commit 6459946a62985e08c50ca397447af42662ec6558
Author: Till Rohrmann
Date: 2016-01-28T15:41:01Z
Adds CEP documentation
Adds online documentation for the CEP library
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---