[FLINK-6244] [cep] Emit timeouted Patterns as Side Output This closes #4320
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ed5815e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ed5815e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ed5815e Branch: refs/heads/master Commit: 6ed5815e8b3f06b95470d1a4598fede0a9be9280 Parents: 9995588 Author: Dawid Wysakowicz <dwysakow...@apache.org> Authored: Fri Aug 11 11:56:28 2017 +0200 Committer: Dawid Wysakowicz <dwysakow...@apache.org> Committed: Wed Aug 23 08:35:29 2017 +0200 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 54 ++- .../apache/flink/cep/scala/PatternStream.scala | 231 ++++++++--- ...StreamScalaJavaAPIInteroperabilityTest.scala | 45 +- .../org/apache/flink/cep/PatternStream.java | 412 +++++++++++-------- .../main/java/org/apache/flink/cep/nfa/NFA.java | 3 +- .../AbstractKeyedCEPPatternOperator.java | 45 +- .../flink/cep/operator/CEPOperatorUtils.java | 302 ++++++++++---- .../cep/operator/FlatSelectCepOperator.java | 67 +++ .../operator/FlatSelectTimeoutCepOperator.java | 130 ++++++ .../cep/operator/KeyedCEPPatternOperator.java | 83 ---- .../flink/cep/operator/SelectCepOperator.java | 56 +++ .../cep/operator/SelectTimeoutCepOperator.java | 119 ++++++ .../TimeoutKeyedCEPPatternOperator.java | 92 ----- .../TimestampedSideOutputCollector.java | 82 ++++ .../cep/operator/CEPMigration11to13Test.java | 61 +-- .../flink/cep/operator/CEPMigrationTest.java | 66 +-- .../flink/cep/operator/CEPOperatorTest.java | 285 +++++-------- .../flink/cep/operator/CEPRescalingTest.java | 8 +- .../cep/operator/CepOperatorTestUtilities.java | 113 +++++ 19 files changed, 1441 insertions(+), 813 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index bddb9b2..4b13bb3 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -1316,63 +1316,75 @@ and `flatSelect` API calls allow a timeout handler to be specified. This timeout partial event sequence. The timeout handler receives all the events that have been matched so far by the pattern, and the timestamp when the timeout was detected. +In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as +parameters + + * `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` + * [OutputTag]({{ site.baseurl }}/dev/stream/side_output.html) for the side output in which the timeouted matches will be returned + * and the known `PatternSelectFunction`/`PatternFlatSelectFunction`. + <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> -In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as -the first parameter a `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` and as second parameter the known -`PatternSelectFunction`/`PatternFlatSelectFunction`. The return type of the timeout function can be different from the -select function. The timeout event and the select event are wrapped in `Either.Left` and `Either.Right` respectively -so that the resulting data stream is of type `org.apache.flink.types.Either`. -{% highlight java %} +~~~java PatternStream<Event> patternStream = CEP.pattern(input, pattern); -DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select( +OutputTag<String> outputTag = new OutputTag<String>("side-output"){}; + +SingleOutputStreamOperator<ComplexEvent> result = patternStream.select( new PatternTimeoutFunction<Event, TimeoutEvent>() {...}, + outputTag, new PatternSelectFunction<Event, ComplexEvent>() {...} ); -DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect( +DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag); + +SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect( new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...}, + outputTag, new PatternFlatSelectFunction<Event, ComplexEvent>() {...} ); -{% endhighlight %} + +DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag); +~~~ </div> <div data-lang="scala" markdown="1"> -In order to treat partial patterns, the `select` API call offers an overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. -The timeout function is called with a map of string-event pairs of the partial match which has timed out and a long indicating when the timeout occurred. -The string is defined by the name of the pattern to which the event has been matched. -The timeout function returns exactly one result per call. -The return type of the timeout function can be different from the select function. -The timeout event and the select event are wrapped in `Left` and `Right` respectively so that the resulting data stream is of type `Either`. -{% highlight scala %} +~~~scala val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) -DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{ +val outputTag = OutputTag[String]("side-output") + +val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){ (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent() } { pattern: Map[String, Iterable[Event]] => ComplexEvent() } -{% endhighlight %} + +val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag); +~~~ The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. In contrast to the `select` functions, the `flatSelect` functions are called with a `Collector`. The collector can be used to emit an arbitrary number of events. -{% highlight scala %} +~~~scala val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) -DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{ +val outputTag = OutputTag[String]("side-output") + +val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){ (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) => out.collect(TimeoutEvent()) } { (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) => out.collect(ComplexEvent()) } -{% endhighlight %} + +val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag); +~~~ </div> </div> http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala index d270ef7..eacfa87 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala @@ -17,19 +17,14 @@ */ package org.apache.flink.cep.scala -import java.util.{List => JList, Map => JMap} +import java.util.{UUID, List => JList, Map => JMap} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream} import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream} import org.apache.flink.streaming.api.scala.{asScalaStream, _} import org.apache.flink.util.Collector -import org.apache.flink.types.{Either => FEither} -import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2} -import java.lang.{Long => JLong} - -import org.apache.flink.cep.operator.CEPOperatorUtils -import org.apache.flink.cep.scala.pattern.Pattern import scala.collection.Map @@ -84,37 +79,54 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { * pattern sequence. * @tparam L Type of the resulting timeout event * @tparam R Type of the resulting event + * @deprecated Use the version that returns timeouted events as a side-output * @return Data stream of either type which contains the resulting events and resulting timeout * events. */ + @deprecated def select[L: TypeInformation, R: TypeInformation]( patternTimeoutFunction: PatternTimeoutFunction[T, L], patternSelectFunction: PatternSelectFunction[T, R]) : DataStream[Either[L, R]] = { + val outputTag = OutputTag[L](UUID.randomUUID().toString) + val mainStream = select(outputTag, patternTimeoutFunction, patternSelectFunction) + mainStream.connect(mainStream.getSideOutput[L](outputTag)).map(r => Right(r), l => Left(l)) + } - val patternStream = CEPOperatorUtils.createTimeoutPatternStream( - jPatternStream.getInputStream, - jPatternStream.getPattern, - jPatternStream.getComparator) - + /** + * Applies a select function to the detected pattern sequence. For each pattern sequence the + * provided [[PatternSelectFunction]] is called. The pattern select function can produce + * exactly one resulting element. + * + * Additionally a timeout function is applied to partial event patterns which have timed out. For + * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern + * timeout function has to produce exactly one resulting timeout event. + * + * You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the + * [[DataStream]] resulting from the windowed operation with the same [[OutputTag]]. + * + * @param outputTag [[OutputTag]] that identifies side output with timeouted patterns + * @param patternTimeoutFunction The pattern timeout function which is called for each partial + * pattern sequence which has timed out. + * @param patternSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @tparam L Type of the resulting timeout event + * @tparam R Type of the resulting event + * @return Data stream which contains the resulting elements with the resulting timeout elements + * in a side output. + */ + def select[L: TypeInformation, R: TypeInformation]( + outputTag: OutputTag[L], + patternTimeoutFunction: PatternTimeoutFunction[T, L], + patternSelectFunction: PatternSelectFunction[T, R]) + : DataStream[R] = { val cleanedSelect = cleanClosure(patternSelectFunction) val cleanedTimeout = cleanClosure(patternTimeoutFunction) - implicit val eitherTypeInfo = createTypeInformation[Either[L, R]] - - asScalaStream(patternStream).map[Either[L, R]] { - input: FEither[FTuple2[JMap[String, JList[T]], JLong], JMap[String, JList[T]]] => - if (input.isLeft) { - val timeout = input.left() - val timeoutEvent = cleanedTimeout.timeout(timeout.f0, timeout.f1) - val t = Left[L, R](timeoutEvent) - t - } else { - val event = cleanedSelect.select(input.right()) - val t = Right[L, R](event) - t - } - } + asScalaStream( + jPatternStream + .select(outputTag, cleanedTimeout, implicitly[TypeInformation[R]], cleanedSelect) + ) } /** @@ -151,44 +163,58 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { * detected pattern sequence. * @tparam L Type of the resulting timeout event * @tparam R Type of the resulting event + * @deprecated Use the version that returns timeouted events as a side-output * @return Data stream of either type which contains the resulting events and the resulting * timeout events wrapped in a [[Either]] type. */ + @deprecated def flatSelect[L: TypeInformation, R: TypeInformation]( patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L], patternFlatSelectFunction: PatternFlatSelectFunction[T, R]) : DataStream[Either[L, R]] = { - val patternStream = CEPOperatorUtils.createTimeoutPatternStream( - jPatternStream.getInputStream, - jPatternStream.getPattern, - jPatternStream.getComparator - ) - - val cleanedSelect = cleanClosure(patternFlatSelectFunction) - val cleanedTimeout = cleanClosure(patternFlatTimeoutFunction) - implicit val eitherTypeInfo = createTypeInformation[Either[L, R]] - - asScalaStream(patternStream).flatMap[Either[L, R]] { - (input: FEither[FTuple2[JMap[String, JList[T]], JLong], JMap[String, JList[T]]], - collector: Collector[Either[L, R]]) => - - if (input.isLeft()) { - val timeout = input.left() + val outputTag = OutputTag[L]("dummy-timeouted") + val mainStream = flatSelect(outputTag, patternFlatTimeoutFunction, patternFlatSelectFunction) + mainStream.connect(mainStream.getSideOutput[L](outputTag)).map(r => Right(r), l => Left(l)) + } - cleanedTimeout.timeout(timeout.f0, timeout.f1, new Collector[L]() { - override def collect(record: L): Unit = collector.collect(Left(record)) + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence + * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can + * produce an arbitrary number of resulting elements. + * + * Additionally a timeout function is applied to partial event patterns which have timed out. For + * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The + * pattern timeout function can produce an arbitrary number of resulting timeout events. + * + * You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the + * [[DataStream]] resulting from the windowed operation with the same [[OutputTag]]. + * + * @param outputTag [[OutputTag]] that identifies side output with timeouted patterns + * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each + * partially matched pattern sequence which has timed out. + * @param patternFlatSelectFunction The pattern flat select function which is called for each + * detected pattern sequence. + * @tparam L Type of the resulting timeout event + * @tparam R Type of the resulting event + * @return Data stream which contains the resulting elements with the resulting timeout elements + * in a side output. + */ + def flatSelect[L: TypeInformation, R: TypeInformation]( + outputTag: OutputTag[L], + patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L], + patternFlatSelectFunction: PatternFlatSelectFunction[T, R]) + : DataStream[R] = { - override def close(): Unit = collector.close() - }) - } else { - cleanedSelect.flatSelect(input.right, new Collector[R]() { - override def collect(record: R): Unit = collector.collect(Right(record)) + val cleanedSelect = cleanClosure(patternFlatSelectFunction) + val cleanedTimeout = cleanClosure(patternFlatTimeoutFunction) - override def close(): Unit = collector.close() - }) - } - } + asScalaStream( + jPatternStream.flatSelect( + outputTag, + cleanedTimeout, + implicitly[TypeInformation[R]], + cleanedSelect)) } /** @@ -228,9 +254,11 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { * pattern sequence. * @tparam L Type of the resulting timeout event * @tparam R Type of the resulting event + * @deprecated Use the version that returns timeouted events as a side-output * @return Data stream of either type which contain the resulting events and resulting timeout * events. */ + @deprecated def select[L: TypeInformation, R: TypeInformation]( patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L) ( patternSelectFunction: Map[String, Iterable[T]] => R) @@ -252,6 +280,48 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { } /** + * Applies a select function to the detected pattern sequence. For each pattern sequence the + * provided [[PatternSelectFunction]] is called. The pattern select function can produce + * exactly one resulting element. + * + * Additionally a timeout function is applied to partial event patterns which have timed out. For + * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern + * timeout function has to produce exactly one resulting element. + * + * You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the + * [[DataStream]] resulting from the windowed operation with the same [[OutputTag]]. + * + * @param outputTag [[OutputTag]] that identifies side output with timeouted patterns + * @param patternTimeoutFunction The pattern timeout function which is called for each partial + * pattern sequence which has timed out. + * @param patternSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @tparam L Type of the resulting timeout event + * @tparam R Type of the resulting event + * @return Data stream of either type which contain the resulting events and resulting timeout + * events. + */ + def select[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])( + patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L) ( + patternSelectFunction: Map[String, Iterable[T]] => R) + : DataStream[R] = { + + val cleanSelectFun = cleanClosure(patternSelectFunction) + val cleanTimeoutFun = cleanClosure(patternTimeoutFunction) + + val patternSelectFun = new PatternSelectFunction[T, R] { + override def select(pattern: JMap[String, JList[T]]): R = + cleanSelectFun(mapToScala(pattern)) + } + val patternTimeoutFun = new PatternTimeoutFunction[T, L] { + override def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: Long): L = + cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp) + } + + select(outputTag, patternTimeoutFun, patternSelectFun) + } + + /** * Applies a flat select function to the detected pattern sequence. For each pattern sequence * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function * can produce an arbitrary number of resulting elements. @@ -292,9 +362,11 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { * detected pattern sequence. * @tparam L Type of the resulting timeout event * @tparam R Type of the resulting event + * @deprecated Use the version that returns timeouted events as a side-output * @return Data stream of either type which contains the resulting events and the resulting * timeout events wrapped in a [[Either]] type. */ + @deprecated def flatSelect[L: TypeInformation, R: TypeInformation]( patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit) ( patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit) @@ -319,6 +391,53 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { flatSelect(patternFlatTimeoutFun, patternFlatSelectFun) } + + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence + * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can + * produce an arbitrary number of resulting elements. + * + * Additionally a timeout function is applied to partial event patterns which have timed out. For + * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The + * pattern timeout function can produce an arbitrary number of resulting timeout events. + * + * You can get the stream of timeouted matches using [[DataStream.getSideOutput()]] on the + * [[DataStream]] resulting from the windowed operation with the same [[OutputTag]]. + * + * @param outputTag [[OutputTag]] that identifies side output with timeouted patterns + * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each + * partially matched pattern sequence which has timed out. + * @param patternFlatSelectFunction The pattern flat select function which is called for each + * detected pattern sequence. + * @tparam L Type of the resulting timeout event + * @tparam R Type of the resulting event + * @return Data stream of either type which contains the resulting events and the resulting + * timeout events wrapped in a [[Either]] type. + */ + def flatSelect[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])( + patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit) ( + patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit) + : DataStream[R] = { + + val cleanSelectFun = cleanClosure(patternFlatSelectFunction) + val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction) + + val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] { + override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit = + cleanSelectFun(mapToScala(pattern), out) + } + + val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] { + override def timeout( + pattern: JMap[String, JList[T]], + timeoutTimestamp: Long, out: Collector[L]) + : Unit = { + cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out) + } + } + + flatSelect(outputTag, patternFlatTimeoutFun, patternFlatSelectFun) + } } object PatternStream { @@ -328,7 +447,7 @@ object PatternStream { * @tparam T Type of the events * @return A new pattern stream wrapping the pattern stream from Java APU */ - def apply[T](jPatternStream: JPatternStream[T]) = { + def apply[T](jPatternStream: JPatternStream[T]): PatternStream[T] = { new PatternStream[T](jPatternStream) } } http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala index e2161a0..f3371c8 100644 --- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala @@ -21,15 +21,17 @@ import org.apache.flink.api.common.functions.util.ListCollector import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap} import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.streaming.api.transformations.{OneInputTransformation, TwoInputTransformation} import org.apache.flink.util.{Collector, TestLogger} import org.apache.flink.types.{Either => FEither} import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2} - import java.lang.{Long => JLong} import java.util.{Map => JMap} import java.util.{List => JList} +import org.apache.flink.cep.operator.{FlatSelectCepOperator, FlatSelectTimeoutCepOperator, SelectCepOperator} +import org.apache.flink.streaming.api.functions.co.CoMapFunction + import scala.collection.JavaConverters._ import scala.collection.Map import org.junit.Assert._ @@ -51,8 +53,8 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger { assertEquals(param, pattern) param.get("begin").get(0) }) - val out = extractUserFunction[StreamMap[JMap[String, JList[(Int, Int)]], (Int, Int)]](result) - .getUserFunction.map(param.mapValues(_.asJava).asJava) + val out = extractUserFunction[SelectCepOperator[(Int, Int), Byte, (Int, Int)]](result) + .getUserFunction.select(param.mapValues(_.asJava).asJava) //verifies output parameter forwarding assertEquals(param.get("begin").get(0), out) } @@ -77,8 +79,8 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger { out.collect(pattern.get("begin").get.head) }) - extractUserFunction[StreamFlatMap[java.util.Map[String, JList[List[Int]]], List[Int]]](result). - getUserFunction.flatMap(inParam.mapValues(_.asJava).asJava, outParam) + extractUserFunction[FlatSelectCepOperator[List[Int], Byte, List[Int]]](result). + getUserFunction.flatSelect(inParam.mapValues(_.asJava).asJava, outParam) //verify output parameter forwarding and that flatMap function was actually called assertEquals(inList, outList.get(0)) } @@ -96,28 +98,26 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger { val expectedOutput = List(Right("match"), Right("barfoo"), Left("timeout"), Left("barfoo")) .asJava - val result: DataStream[Either[String, String]] = pStream.flatSelect { - (pattern: Map[String, Iterable[String]], timestamp: Long, out: Collector[String]) => - out.collect("timeout") - out.collect(pattern("begin").head) + val outputTag = OutputTag[Either[String, String]]("timeouted") + val result: DataStream[Either[String, String]] = pStream.flatSelect(outputTag) { + (pattern: Map[String, Iterable[String]], timestamp: Long, + out: Collector[Either[String, String]]) => + out.collect(Left("timeout")) + out.collect(Left(pattern("begin").head)) } { - (pattern: Map[String, Iterable[String]], out: Collector[String]) => + (pattern: Map[String, Iterable[String]], out: Collector[Either[String, String]]) => //verifies input parameter forwarding assertEquals(inParam, pattern) - out.collect("match") - out.collect(pattern("begin").head) + out.collect(Right("match")) + out.collect(Right(pattern("begin").head)) } - val fun = extractUserFunction[ - StreamFlatMap[ - FEither[ - FTuple2[JMap[String, JList[String]], JLong], - JMap[String, JList[String]]], - Either[String, String]]](result) + val fun = extractUserFunction[FlatSelectTimeoutCepOperator[String, Either[String, String], + Either[String, String], Byte]]( + result).getUserFunction - fun.getUserFunction.flatMap(FEither.Right(inParam.mapValues(_.asJava).asJava), output) - fun.getUserFunction.flatMap(FEither.Left(FTuple2.of(inParam.mapValues(_.asJava).asJava, 42L)), - output) + fun.getFlatSelectFunction.flatSelect(inParam.mapValues(_.asJava).asJava, output) + fun.getFlatTimeoutFunction.timeout(inParam.mapValues(_.asJava).asJava, 42L, output) assertEquals(expectedOutput, outList) } @@ -129,4 +129,5 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger { .getOperator .asInstanceOf[T] } + } http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java index 555d270..6380375 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -18,21 +18,18 @@ package org.apache.flink.cep; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.EitherTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.cep.operator.CEPOperatorUtils; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.types.Either; -import org.apache.flink.util.Collector; - -import java.util.List; -import java.util.Map; +import org.apache.flink.util.OutputTag; /** * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected @@ -109,6 +106,16 @@ public class PatternStream<T> { } /** + * Invokes the {@link org.apache.flink.api.java.ClosureCleaner} + * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}. + * + * @return The cleaned Function + */ + private <F> F clean(F f) { + return inputStream.getExecutionEnvironment().clean(f); + } + + /** * Applies a select function to the detected pattern sequence. For each pattern sequence the * provided {@link PatternSelectFunction} is called. The pattern select function can produce * exactly one resulting element. @@ -121,13 +128,94 @@ public class PatternStream<T> { * function. */ public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) { - SingleOutputStreamOperator<Map<String, List<T>>> patternStream = - CEPOperatorUtils.createPatternStream(inputStream, pattern, comparator); + return CEPOperatorUtils.createPatternStream(inputStream, pattern, comparator, clean(patternSelectFunction), outTypeInfo); + } - return patternStream.map( - new PatternSelectMapper<>( - patternStream.getExecutionEnvironment().clean(patternSelectFunction))) - .returns(outTypeInfo); + /** + * Applies a select function to the detected pattern sequence. For each pattern sequence the + * provided {@link PatternSelectFunction} is called. The pattern select function can produce + * exactly one resulting element. + * + * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each + * partial pattern sequence the provided {@link PatternTimeoutFunction} is called. The pattern + * timeout function can produce exactly one resulting element. + * + * <p>You can get the stream of timed-out data resulting from the + * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the + * {@link SingleOutputStreamOperator} resulting from the select operation + * with the same {@link OutputTag}. + * + * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns + * @param patternTimeoutFunction The pattern timeout function which is called for each partial + * pattern sequence which has timed out. + * @param patternSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @param <L> Type of the resulting timeout elements + * @param <R> Type of the resulting elements + * @return {@link DataStream} which contains the resulting elements with the resulting timeout + * elements in a side output. + */ + public <L, R> SingleOutputStreamOperator<R> select( + final OutputTag<L> timeoutOutputTag, + final PatternTimeoutFunction<T, L> patternTimeoutFunction, + final PatternSelectFunction<T, R> patternSelectFunction) { + + TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + patternSelectFunction, + PatternSelectFunction.class, + 0, + 1, + new int[]{0, 1, 0}, + new int[]{}, + inputStream.getType(), + null, + false); + + return select( + timeoutOutputTag, + patternTimeoutFunction, + rightTypeInfo, + patternSelectFunction); + } + + /** + * Applies a select function to the detected pattern sequence. For each pattern sequence the + * provided {@link PatternSelectFunction} is called. The pattern select function can produce + * exactly one resulting element. + * + * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each + * partial pattern sequence the provided {@link PatternTimeoutFunction} is called. The pattern + * timeout function can produce exactly one resulting element. + * + * <p>You can get the stream of timed-out data resulting from the + * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the + * {@link SingleOutputStreamOperator} resulting from the select operation + * with the same {@link OutputTag}. + * + * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns + * @param patternTimeoutFunction The pattern timeout function which is called for each partial + * pattern sequence which has timed out. + * @param outTypeInfo Explicit specification of output type. + * @param patternSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @param <L> Type of the resulting timeout elements + * @param <R> Type of the resulting elements + * @return {@link DataStream} which contains the resulting elements with the resulting timeout + * elements in a side output. + */ + public <L, R> SingleOutputStreamOperator<R> select( + final OutputTag<L> timeoutOutputTag, + final PatternTimeoutFunction<T, L> patternTimeoutFunction, + final TypeInformation<R> outTypeInfo, + final PatternSelectFunction<T, R> patternSelectFunction) { + return CEPOperatorUtils.createTimeoutPatternStream( + inputStream, + pattern, + comparator, + clean(patternSelectFunction), + outTypeInfo, + timeoutOutputTag, + clean(patternTimeoutFunction)); } /** @@ -145,19 +233,21 @@ public class PatternStream<T> { * pattern sequence. * @param <L> Type of the resulting timeout elements * @param <R> Type of the resulting elements + * + * @deprecated Use {@link PatternStream#select(OutputTag, PatternTimeoutFunction, PatternSelectFunction)} + * that returns timeouted events as a side-output + * * @return {@link DataStream} which contains the resulting elements or the resulting timeout * elements wrapped in an {@link Either} type. */ + @Deprecated public <L, R> SingleOutputStreamOperator<Either<L, R>> select( final PatternTimeoutFunction<T, L> patternTimeoutFunction, final PatternSelectFunction<T, R> patternSelectFunction) { - SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream = - CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, comparator); - - TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( - patternTimeoutFunction, - PatternTimeoutFunction.class, + TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + patternSelectFunction, + PatternSelectFunction.class, 0, 1, new int[]{0, 1, 0}, @@ -166,9 +256,9 @@ public class PatternStream<T> { null, false); - TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType( - patternSelectFunction, - PatternSelectFunction.class, + TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + patternTimeoutFunction, + PatternTimeoutFunction.class, 0, 1, new int[]{0, 1, 0}, @@ -177,14 +267,22 @@ public class PatternStream<T> { null, false); + final OutputTag<L> outputTag = new OutputTag<L>("dummy-timeouted", leftTypeInfo); + + final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream( + inputStream, + pattern, + comparator, + clean(patternSelectFunction), + rightTypeInfo, + outputTag, + clean(patternTimeoutFunction)); + + final DataStream<L> timeoutedStream = mainStream.getSideOutput(outputTag); + TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo); - return patternStream.map( - new PatternSelectTimeoutMapper<>( - patternStream.getExecutionEnvironment().clean(patternSelectFunction), - patternStream.getExecutionEnvironment().clean(patternTimeoutFunction) - ) - ).returns(outTypeInfo); + return mainStream.connect(timeoutedStream).map(new CoMapTimeout<>()).returns(outTypeInfo); } /** @@ -227,14 +325,99 @@ public class PatternStream<T> { * @return {@link DataStream} which contains the resulting elements from the pattern flat select * function. */ - public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) { - SingleOutputStreamOperator<Map<String, List<T>>> patternStream = - CEPOperatorUtils.createPatternStream(inputStream, pattern, comparator); - - return patternStream.flatMap( - new PatternFlatSelectMapper<>( - patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction) - )).returns(outTypeInfo); + public <R> SingleOutputStreamOperator<R> flatSelect( + final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, + final TypeInformation<R> outTypeInfo) { + return CEPOperatorUtils.createPatternStream( + inputStream, + pattern, + comparator, + clean(patternFlatSelectFunction), + outTypeInfo); + } + + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence the + * provided {@link PatternFlatSelectFunction} is called. The pattern select function can produce + * exactly one resulting element. + * + * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each + * partial pattern sequence the provided {@link PatternFlatTimeoutFunction} is called. The pattern + * timeout function can produce exactly one resulting element. + * + * <p>You can get the stream of timed-out data resulting from the + * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the + * {@link SingleOutputStreamOperator} resulting from the select operation + * with the same {@link OutputTag}. + * + * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns + * @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial + * pattern sequence which has timed out. + * @param patternFlatSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @param <L> Type of the resulting timeout elements + * @param <R> Type of the resulting elements + * @return {@link DataStream} which contains the resulting elements with the resulting timeout + * elements in a side output. + */ + public <L, R> SingleOutputStreamOperator<R> flatSelect( + final OutputTag<L> timeoutOutputTag, + final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, + final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { + + TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + patternFlatSelectFunction, + PatternFlatSelectFunction.class, + 0, + 1, + new int[]{0, 1, 0}, + new int[]{1, 0}, + inputStream.getType(), + null, + false); + + return flatSelect(timeoutOutputTag, patternFlatTimeoutFunction, rightTypeInfo, patternFlatSelectFunction); + } + + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence the + * provided {@link PatternFlatSelectFunction} is called. The pattern select function can produce + * exactly one resulting element. + * + * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each + * partial pattern sequence the provided {@link PatternFlatTimeoutFunction} is called. The pattern + * timeout function can produce exactly one resulting element. + * + * <p>You can get the stream of timed-out data resulting from the + * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the + * {@link SingleOutputStreamOperator} resulting from the select operation + * with the same {@link OutputTag}. + * + * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns + * @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial + * pattern sequence which has timed out. + * @param patternFlatSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @param outTypeInfo Explicit specification of output type. + * @param <L> Type of the resulting timeout elements + * @param <R> Type of the resulting elements + * @return {@link DataStream} which contains the resulting elements with the resulting timeout + * elements in a side output. + */ + public <L, R> SingleOutputStreamOperator<R> flatSelect( + final OutputTag<L> timeoutOutputTag, + final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, + final TypeInformation<R> outTypeInfo, + final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { + + return CEPOperatorUtils.createTimeoutPatternStream( + inputStream, + pattern, + comparator, + clean(patternFlatSelectFunction), + outTypeInfo, + timeoutOutputTag, + clean(patternFlatTimeoutFunction)); } /** @@ -252,17 +435,19 @@ public class PatternStream<T> { * detected pattern sequence. * @param <L> Type of the resulting timeout events * @param <R> Type of the resulting events + * + * @deprecated Use {@link PatternStream#flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)} + * that returns timeouted events as a side-output + * * @return {@link DataStream} which contains the resulting events from the pattern flat select * function or the resulting timeout events from the pattern flat timeout function wrapped in an * {@link Either} type. */ + @Deprecated public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect( final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { - SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream = - CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, comparator); - TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternFlatTimeoutFunction, PatternFlatTimeoutFunction.class, @@ -285,147 +470,40 @@ public class PatternStream<T> { null, false); - TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo); - - return patternStream.flatMap( - new PatternFlatSelectTimeoutWrapper<>( - patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction), - patternStream.getExecutionEnvironment().clean(patternFlatTimeoutFunction) - ) - ).returns(outTypeInfo); - } - - /** - * Wrapper for a {@link PatternSelectFunction}. - * - * @param <T> Type of the input elements - * @param <R> Type of the resulting elements - */ - private static class PatternSelectMapper<T, R> implements MapFunction<Map<String, List<T>>, R> { - private static final long serialVersionUID = 2273300432692943064L; - - private final PatternSelectFunction<T, R> patternSelectFunction; - - public PatternSelectMapper(PatternSelectFunction<T, R> patternSelectFunction) { - this.patternSelectFunction = patternSelectFunction; - } - - @Override - public R map(Map<String, List<T>> value) throws Exception { - return patternSelectFunction.select(value); - } - } - - private static class PatternSelectTimeoutMapper<T, L, R> implements MapFunction<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>, Either<L, R>> { - - private static final long serialVersionUID = 8259477556738887724L; - - private final PatternSelectFunction<T, R> patternSelectFunction; - private final PatternTimeoutFunction<T, L> patternTimeoutFunction; - - public PatternSelectTimeoutMapper( - PatternSelectFunction<T, R> patternSelectFunction, - PatternTimeoutFunction<T, L> patternTimeoutFunction) { - - this.patternSelectFunction = patternSelectFunction; - this.patternTimeoutFunction = patternTimeoutFunction; - } - - @Override - public Either<L, R> map(Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>> value) throws Exception { - if (value.isLeft()) { - Tuple2<Map<String, List<T>>, Long> timeout = value.left(); - - return Either.Left(patternTimeoutFunction.timeout(timeout.f0, timeout.f1)); - } else { - return Either.Right(patternSelectFunction.select(value.right())); - } - } - } + final OutputTag<L> outputTag = new OutputTag<L>("dummy-timeouted", leftTypeInfo); - private static class PatternFlatSelectTimeoutWrapper<T, L, R> implements FlatMapFunction<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>, Either<L, R>> { + final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream( + inputStream, + pattern, + comparator, + clean(patternFlatSelectFunction), + rightTypeInfo, + outputTag, + clean(patternFlatTimeoutFunction)); - private static final long serialVersionUID = 7483674669662261667L; + final DataStream<L> timeoutedStream = mainStream.getSideOutput(outputTag); - private final PatternFlatSelectFunction<T, R> patternFlatSelectFunction; - private final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction; - - public PatternFlatSelectTimeoutWrapper( - PatternFlatSelectFunction<T, R> patternFlatSelectFunction, - PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction) { - this.patternFlatSelectFunction = patternFlatSelectFunction; - this.patternFlatTimeoutFunction = patternFlatTimeoutFunction; - } - - @Override - public void flatMap(Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>> value, Collector<Either<L, R>> out) throws Exception { - if (value.isLeft()) { - Tuple2<Map<String, List<T>>, Long> timeout = value.left(); - - patternFlatTimeoutFunction.timeout(timeout.f0, timeout.f1, new LeftCollector<>(out)); - } else { - patternFlatSelectFunction.flatSelect(value.right(), new RightCollector(out)); - } - } - - private static class LeftCollector<L, R> implements Collector<L> { - - private final Collector<Either<L, R>> out; - - private LeftCollector(Collector<Either<L, R>> out) { - this.out = out; - } - - @Override - public void collect(L record) { - out.collect(Either.<L, R>Left(record)); - } - - @Override - public void close() { - out.close(); - } - } - - private static class RightCollector<L, R> implements Collector<R> { - - private final Collector<Either<L, R>> out; - - private RightCollector(Collector<Either<L, R>> out) { - this.out = out; - } - - @Override - public void collect(R record) { - out.collect(Either.<L, R>Right(record)); - } + TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo); - @Override - public void close() { - out.close(); - } - } + return mainStream.connect(timeoutedStream).map(new CoMapTimeout<>()).returns(outTypeInfo); } /** - * Wrapper for a {@link PatternFlatSelectFunction}. - * - * @param <T> Type of the input elements - * @param <R> Type of the resulting elements + * Used for joining results from timeout side-output for API backward compatibility. */ - private static class PatternFlatSelectMapper<T, R> implements FlatMapFunction<Map<String, List<T>>, R> { + @Internal + public static class CoMapTimeout<R, L> implements CoMapFunction<R, L, Either<L, R>> { - private static final long serialVersionUID = -8610796233077989108L; + private static final long serialVersionUID = 2059391566945212552L; - private final PatternFlatSelectFunction<T, R> patternFlatSelectFunction; - - public PatternFlatSelectMapper(PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { - this.patternFlatSelectFunction = patternFlatSelectFunction; + @Override + public Either<L, R> map1(R value) throws Exception { + return Either.Right(value); } @Override - public void flatMap(Map<String, List<T>> value, Collector<R> out) throws Exception { - patternFlatSelectFunction.flatSelect(value, out); + public Either<L, R> map2(L value) throws Exception { + return Either.Left(value); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 23084ee..3a1f621 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -34,6 +34,7 @@ import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.cep.NonDuplicatingTypeSerializer; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler; +import org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -71,7 +72,7 @@ import java.util.Stack; /** * Non-deterministic finite automaton implementation. * - * <p>The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator} + * <p>The {@link AbstractKeyedCEPPatternOperator CEP operator} * keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones. * When an event gets processed, it updates the NFA's internal state machine. * http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 66663d2..4c67e9d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.cep.operator; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -44,7 +45,7 @@ import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStr import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -60,6 +61,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,9 +80,10 @@ import java.util.stream.StreamSupport; * @param <IN> Type of the input elements * @param <KEY> Type of the key on which the input stream is keyed * @param <OUT> Type of the output elements + * @param <F> user function that can be applied to matching sequences or timeouted sequences */ -public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> - extends AbstractStreamOperator<OUT> +public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Function> + extends AbstractUdfStreamOperator<OUT, F> implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace>, CheckpointedRestoringOperator { private static final long serialVersionUID = -4166778210774160757L; @@ -126,7 +129,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> final TypeSerializer<KEY> keySerializer, final NFACompiler.NFAFactory<IN> nfaFactory, final boolean migratingFromOldKeyedOperator, - final EventComparator<IN> comparator) { + final EventComparator<IN> comparator, + final F function) { + super(function); this.inputSerializer = Preconditions.checkNotNull(inputSerializer); this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime); @@ -348,7 +353,18 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> * @param event The current event to be processed * @param timestamp The timestamp of the event */ - protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp); + private void processEvent(NFA<IN> nfa, IN event, long timestamp) { + Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns = + nfa.process(event, timestamp); + + try { + processMatchedSequences(patterns.f0, timestamp); + processTimeoutedSequence(patterns.f1, timestamp); + } catch (Exception e) { + //rethrow as Runtime, to be able to use processEvent in Stream. + throw new RuntimeException(e); + } + } /** * Advances the time for the given NFA to the given timestamp. This can lead to pruning and @@ -357,7 +373,16 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> * @param nfa to advance the time for * @param timestamp to advance the time to */ - protected abstract void advanceTime(NFA<IN> nfa, long timestamp); + private void advanceTime(NFA<IN> nfa, long timestamp) throws Exception { + processEvent(nfa, null, timestamp); + } + + protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception; + + protected void processTimeoutedSequence( + Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, + long timestamp) throws Exception { + } ////////////////////// Backwards Compatibility ////////////////////// @@ -606,10 +631,10 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousElemSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousElemSerializerAndConfig.f1, - elementSerializer); + previousElemSerializerAndConfig.f0, + UnloadableDummyTypeSerializer.class, + previousElemSerializerAndConfig.f1, + elementSerializer); if (!compatResult.isRequiresMigration()) { return CompatibilityResult.compatible(); http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index de2d8f8..a662faf 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -18,28 +18,25 @@ package org.apache.flink.cep.operator; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.ByteSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.NullByteKeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.EitherTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.cep.EventComparator; +import org.apache.flink.cep.PatternFlatSelectFunction; +import org.apache.flink.cep.PatternFlatTimeoutFunction; +import org.apache.flink.cep.PatternSelectFunction; import org.apache.flink.cep.PatternStream; +import org.apache.flink.cep.PatternTimeoutFunction; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.types.Either; - -import java.util.List; -import java.util.Map; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.util.OutputTag; /** * Utility methods for creating {@link PatternStream}. @@ -47,99 +44,245 @@ import java.util.Map; public class CEPOperatorUtils { /** - * Creates a data stream containing the fully matching event patterns of the NFA computation. + * Creates a data stream containing results of {@link PatternSelectFunction} to fully matching event patterns. * - * @param <K> Type of the key - * @return Data stream containing fully matched event sequences stored in a {@link Map}. The - * events are indexed by their associated names of the pattern. + * @param inputStream stream of input events + * @param pattern pattern to be search for in the stream + * @param selectFunction function to be applied to matching event sequences + * @param outTypeInfo output TypeInformation of selectFunction + * @param <IN> type of input events + * @param <OUT> type of output events + * @return Data stream containing fully matched event sequence with applied {@link PatternSelectFunction} */ - public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream( - DataStream<T> inputStream, - Pattern<T, ?> pattern, - EventComparator<T> comparator) { - final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); - - // check whether we use processing time - final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; - - // compile our pattern into a NFAFactory to instantiate NFAs later on - final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false); - - final SingleOutputStreamOperator<Map<String, List<T>>> patternStream; + public static <IN, OUT> SingleOutputStreamOperator<OUT> createPatternStream( + final DataStream<IN> inputStream, + final Pattern<IN, ?> pattern, + final EventComparator<IN> comparator, + final PatternSelectFunction<IN, OUT> selectFunction, + final TypeInformation<OUT> outTypeInfo) { + return createPatternStream(inputStream, pattern, outTypeInfo, false, comparator, new OperatorBuilder<IN, OUT>() { + @Override + public <KEY> OneInputStreamOperator<IN, OUT> build( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator, + EventComparator<IN> comparator) { + return new SelectCepOperator<>( + inputSerializer, + isProcessingTime, + keySerializer, + nfaFactory, + migratingFromOldKeyedOperator, + comparator, + selectFunction + ); + } - if (inputStream instanceof KeyedStream) { - // We have to use the KeyedCEPPatternOperator which can deal with keyed input streams - KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream; + @Override + public String getKeyedOperatorName() { + return "SelectCepOperator"; + } - TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig()); + @Override + public String getOperatorName() { + return "SelectCepOperator"; + } + }); + } - patternStream = keyedStream.transform( - "KeyedCEPPatternOperator", - (TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class), - new KeyedCEPPatternOperator<>( + /** + * Creates a data stream containing results of {@link PatternFlatSelectFunction} to fully matching event patterns. + * + * @param inputStream stream of input events + * @param pattern pattern to be search for in the stream + * @param selectFunction function to be applied to matching event sequences + * @param outTypeInfo output TypeInformation of selectFunction + * @param <IN> type of input events + * @param <OUT> type of output events + * @return Data stream containing fully matched event sequence with applied {@link PatternFlatSelectFunction} + */ + public static <IN, OUT> SingleOutputStreamOperator<OUT> createPatternStream( + final DataStream<IN> inputStream, + final Pattern<IN, ?> pattern, + final EventComparator<IN> comparator, + final PatternFlatSelectFunction<IN, OUT> selectFunction, + final TypeInformation<OUT> outTypeInfo) { + return createPatternStream(inputStream, pattern, outTypeInfo, false, comparator, new OperatorBuilder<IN, OUT>() { + @Override + public <KEY> OneInputStreamOperator<IN, OUT> build( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator, + EventComparator<IN> comparator) { + return new FlatSelectCepOperator<>( inputSerializer, isProcessingTime, keySerializer, nfaFactory, - true, - comparator)); - } else { + migratingFromOldKeyedOperator, + comparator, + selectFunction + ); + } - KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); - TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; + @Override + public String getKeyedOperatorName() { + return "FlatSelectCepOperator"; + } - patternStream = inputStream.keyBy(keySelector).transform( - "CEPPatternOperator", - (TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class), - new KeyedCEPPatternOperator<>( + @Override + public String getOperatorName() { + return "FlatSelectCepOperator"; + } + }); + } + + /** + * Creates a data stream containing results of {@link PatternFlatSelectFunction} to fully matching event patterns and + * also timeouted partially matched with applied {@link PatternFlatTimeoutFunction} as a sideoutput. + * + * @param inputStream stream of input events + * @param pattern pattern to be search for in the stream + * @param selectFunction function to be applied to matching event sequences + * @param outTypeInfo output TypeInformation of selectFunction + * @param outputTag {@link OutputTag} for a side-output with timeouted matches + * @param timeoutFunction function to be applied to timeouted event sequences + * @param <IN> type of input events + * @param <OUT1> type of fully matched events + * @param <OUT2> type of timeouted events + * @return Data stream containing fully matched event sequence with applied {@link PatternFlatSelectFunction} that + * contains timeouted patterns with applied {@link PatternFlatTimeoutFunction} as side-output + */ + public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream( + final DataStream<IN> inputStream, + final Pattern<IN, ?> pattern, + final EventComparator<IN> comparator, + final PatternFlatSelectFunction<IN, OUT1> selectFunction, + final TypeInformation<OUT1> outTypeInfo, + final OutputTag<OUT2> outputTag, + final PatternFlatTimeoutFunction<IN, OUT2> timeoutFunction) { + return createPatternStream(inputStream, pattern, outTypeInfo, true, comparator, new OperatorBuilder<IN, OUT1>() { + @Override + public <KEY> OneInputStreamOperator<IN, OUT1> build( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator, + EventComparator<IN> comparator) { + return new FlatSelectTimeoutCepOperator<>( inputSerializer, isProcessingTime, keySerializer, nfaFactory, - false, - comparator - )).forceNonParallel(); - } + migratingFromOldKeyedOperator, + comparator, + selectFunction, + timeoutFunction, + outputTag + ); + } - return patternStream; + @Override + public String getKeyedOperatorName() { + return "FlatSelectTimeoutCepOperator"; + } + + @Override + public String getOperatorName() { + return "FlatSelectTimeoutCepOperator"; + } + }); } /** - * Creates a data stream containing fully matching event patterns or partially matching event - * patterns which have timed out. The former are wrapped in a Either.Right and the latter in a - * Either.Left type. + * Creates a data stream containing results of {@link PatternSelectFunction} to fully matching event patterns and + * also timeouted partially matched with applied {@link PatternTimeoutFunction} as a sideoutput. * - * @param <K> Type of the key - * @return Data stream containing fully matched and partially matched event sequences wrapped in - * a {@link Either} instance. + * @param inputStream stream of input events + * @param pattern pattern to be search for in the stream + * @param selectFunction function to be applied to matching event sequences + * @param outTypeInfo output TypeInformation of selectFunction + * @param outputTag {@link OutputTag} for a side-output with timeouted matches + * @param timeoutFunction function to be applied to timeouted event sequences + * @param <IN> type of input events + * @param <OUT1> type of fully matched events + * @param <OUT2> type of timeouted events + * @return Data stream containing fully matched event sequence with applied {@link PatternSelectFunction} that + * contains timeouted patterns with applied {@link PatternTimeoutFunction} as side-output */ - public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> createTimeoutPatternStream( - DataStream<T> inputStream, Pattern<T, ?> pattern, EventComparator<T> comparator) { + public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream( + final DataStream<IN> inputStream, + final Pattern<IN, ?> pattern, + final EventComparator<IN> comparator, + final PatternSelectFunction<IN, OUT1> selectFunction, + final TypeInformation<OUT1> outTypeInfo, + final OutputTag<OUT2> outputTag, + final PatternTimeoutFunction<IN, OUT2> timeoutFunction) { + return createPatternStream(inputStream, pattern, outTypeInfo, true, comparator, new OperatorBuilder<IN, OUT1>() { + @Override + public <KEY> OneInputStreamOperator<IN, OUT1> build( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator, + EventComparator<IN> comparator) { + return new SelectTimeoutCepOperator<>( + inputSerializer, + isProcessingTime, + keySerializer, + nfaFactory, + migratingFromOldKeyedOperator, + comparator, + selectFunction, + timeoutFunction, + outputTag + ); + } + + @Override + public String getKeyedOperatorName() { + return "SelectTimeoutCepOperator"; + } - final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); + @Override + public String getOperatorName() { + return "SelectTimeoutCepOperator"; + } + }); + } + + private static <IN, OUT, K> SingleOutputStreamOperator<OUT> createPatternStream( + final DataStream<IN> inputStream, + final Pattern<IN, ?> pattern, + final TypeInformation<OUT> outTypeInfo, + final boolean timeoutHandling, + final EventComparator<IN> comparator, + final OperatorBuilder<IN, OUT> operatorBuilder) { + final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); // check whether we use processing time final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; // compile our pattern into a NFAFactory to instantiate NFAs later on - final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, true); - - final SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream; + final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, timeoutHandling); - final TypeInformation<Map<String, List<T>>> rightTypeInfo = (TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class); - final TypeInformation<Tuple2<Map<String, List<T>>, Long>> leftTypeInfo = new TupleTypeInfo<>(rightTypeInfo, BasicTypeInfo.LONG_TYPE_INFO); - final TypeInformation<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> eitherTypeInformation = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo); + final SingleOutputStreamOperator<OUT> patternStream; if (inputStream instanceof KeyedStream) { - // We have to use the KeyedCEPPatternOperator which can deal with keyed input streams - KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream; + KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream; TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig()); patternStream = keyedStream.transform( - "TimeoutKeyedCEPPatternOperator", - eitherTypeInformation, - new TimeoutKeyedCEPPatternOperator<>( + operatorBuilder.getKeyedOperatorName(), + outTypeInfo, + operatorBuilder.build( inputSerializer, isProcessingTime, keySerializer, @@ -147,14 +290,13 @@ public class CEPOperatorUtils { true, comparator)); } else { - - KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); + KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>(); TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; patternStream = inputStream.keyBy(keySelector).transform( - "TimeoutCEPPatternOperator", - eitherTypeInformation, - new TimeoutKeyedCEPPatternOperator<>( + operatorBuilder.getOperatorName(), + outTypeInfo, + operatorBuilder.build( inputSerializer, isProcessingTime, keySerializer, @@ -166,4 +308,18 @@ public class CEPOperatorUtils { return patternStream; } + + private interface OperatorBuilder<IN, OUT> { + <K> OneInputStreamOperator<IN, OUT> build( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + TypeSerializer<K> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator, + EventComparator<IN> comparator); + + String getKeyedOperatorName(); + + String getOperatorName(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java new file mode 100644 index 0000000..d44794e --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cep.EventComparator; +import org.apache.flink.cep.PatternFlatSelectFunction; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.streaming.api.operators.TimestampedCollector; + +import java.util.List; +import java.util.Map; + +/** + * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternFlatSelectFunction} to fully matched event patterns. + * + * @param <IN> Type of the input elements + * @param <KEY> Type of the key on which the input stream is keyed + * @param <OUT> Type of the output elements + */ +public class FlatSelectCepOperator<IN, KEY, OUT> + extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT, PatternFlatSelectFunction<IN, OUT>> { + private static final long serialVersionUID = 5845993459551561518L; + + public FlatSelectCepOperator( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator, + EventComparator<IN> comparator, + PatternFlatSelectFunction<IN, OUT> function) { + super(inputSerializer, isProcessingTime, keySerializer, nfaFactory, migratingFromOldKeyedOperator, comparator, function); + } + + private transient TimestampedCollector<OUT> collector; + + @Override + public void open() throws Exception { + super.open(); + collector = new TimestampedCollector<>(output); + } + + @Override + protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception { + for (Map<String, List<IN>> match : matchesSequence) { + collector.setAbsoluteTimestamp(timestamp); + getUserFunction().flatSelect(match, collector); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java new file mode 100644 index 0000000..d46761b --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.EventComparator; +import org.apache.flink.cep.PatternFlatSelectFunction; +import org.apache.flink.cep.PatternFlatTimeoutFunction; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.util.OutputTag; + +import java.util.List; +import java.util.Map; + +/** + * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternFlatSelectFunction} to fully + * matched event patterns and {@link PatternFlatTimeoutFunction} to timeouted ones. The timeouted elements are returned + * as a side-output. + * + * @param <IN> Type of the input elements + * @param <KEY> Type of the key on which the input stream is keyed + * @param <OUT1> Type of the output elements + * @param <OUT2> Type of the timeouted output elements + */ +public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends + AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, FlatSelectTimeoutCepOperator.FlatSelectWrapper<IN, OUT1, OUT2>> { + + private transient TimestampedCollector<OUT1> collector; + + private transient TimestampedSideOutputCollector<OUT2> sideOutputCollector; + + private OutputTag<OUT2> timeoutedOutputTag; + + public FlatSelectTimeoutCepOperator( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator, + EventComparator<IN> comparator, + PatternFlatSelectFunction<IN, OUT1> flatSelectFunction, + PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction, + OutputTag<OUT2> outputTag) { + super( + inputSerializer, + isProcessingTime, + keySerializer, + nfaFactory, + migratingFromOldKeyedOperator, + comparator, + new FlatSelectWrapper<>(flatSelectFunction, flatTimeoutFunction)); + this.timeoutedOutputTag = outputTag; + } + + @Override + public void open() throws Exception { + super.open(); + collector = new TimestampedCollector<>(output); + sideOutputCollector = new TimestampedSideOutputCollector<>(timeoutedOutputTag, output); + } + + @Override + protected void processMatchedSequences( + Iterable<Map<String, List<IN>>> matchesSequence, + long timestamp) throws Exception { + for (Map<String, List<IN>> match : matchesSequence) { + getUserFunction().getFlatSelectFunction().flatSelect(match, collector); + } + } + + @Override + protected void processTimeoutedSequence( + Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception { + for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) { + sideOutputCollector.setAbsoluteTimestamp(timestamp); + getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1, sideOutputCollector); + } + } + + /** + * Wrapper that enables storing {@link PatternFlatSelectFunction} and {@link PatternFlatTimeoutFunction} functions + * in one udf. + */ + @Internal + public static class FlatSelectWrapper<IN, OUT1, OUT2> implements Function { + + private static final long serialVersionUID = -8320546120157150202L; + + private PatternFlatSelectFunction<IN, OUT1> flatSelectFunction; + private PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction; + + @VisibleForTesting + public PatternFlatSelectFunction<IN, OUT1> getFlatSelectFunction() { + return flatSelectFunction; + } + + @VisibleForTesting + public PatternFlatTimeoutFunction<IN, OUT2> getFlatTimeoutFunction() { + return flatTimeoutFunction; + } + + public FlatSelectWrapper( + PatternFlatSelectFunction<IN, OUT1> flatSelectFunction, + PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction) { + this.flatSelectFunction = flatSelectFunction; + this.flatTimeoutFunction = flatTimeoutFunction; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java deleted file mode 100644 index 22f9c14..0000000 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.operator; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.cep.EventComparator; -import org.apache.flink.cep.nfa.NFA; -import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * CEP pattern operator which only returns fully matched event patterns stored in a {@link Map}. The - * events are indexed by the event names associated in the pattern specification. The operator works - * on keyed input data. - * - * @param <IN> Type of the input events - * @param <KEY> Type of the key - */ -public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, List<IN>>> { - private static final long serialVersionUID = 5328573789532074581L; - - public KeyedCEPPatternOperator( - TypeSerializer<IN> inputSerializer, - boolean isProcessingTime, - TypeSerializer<KEY> keySerializer, - NFACompiler.NFAFactory<IN> nfaFactory, - boolean migratingFromOldKeyedOperator, - EventComparator<IN> comparator) { - - super(inputSerializer, isProcessingTime, keySerializer, nfaFactory, migratingFromOldKeyedOperator, comparator); - } - - @Override - protected void processEvent(NFA<IN> nfa, IN event, long timestamp) { - Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns = - nfa.process(event, timestamp); - - emitMatchedSequences(patterns.f0, timestamp); - } - - @Override - protected void advanceTime(NFA<IN> nfa, long timestamp) { - Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns = - nfa.process(null, timestamp); - - emitMatchedSequences(patterns.f0, timestamp); - } - - private void emitMatchedSequences(Iterable<Map<String, List<IN>>> matchedSequences, long timestamp) { - Iterator<Map<String, List<IN>>> iterator = matchedSequences.iterator(); - - if (iterator.hasNext()) { - StreamRecord<Map<String, List<IN>>> streamRecord = new StreamRecord<>(null, timestamp); - - do { - streamRecord.replace(iterator.next()); - output.collect(streamRecord); - } while (iterator.hasNext()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java new file mode 100644 index 0000000..d687c67 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cep.EventComparator; +import org.apache.flink.cep.PatternSelectFunction; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.List; +import java.util.Map; + +/** + * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternSelectFunction} to fully matched event patterns. + * + * @param <IN> Type of the input elements + * @param <KEY> Type of the key on which the input stream is keyed + * @param <OUT> Type of the output elements + */ +public class SelectCepOperator<IN, KEY, OUT> + extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT, PatternSelectFunction<IN, OUT>> { + public SelectCepOperator( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator, + EventComparator<IN> comparator, + PatternSelectFunction<IN, OUT> function) { + super(inputSerializer, isProcessingTime, keySerializer, nfaFactory, migratingFromOldKeyedOperator, comparator, function); + } + + @Override + protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception { + for (Map<String, List<IN>> match : matchesSequence) { + output.collect(new StreamRecord<>(getUserFunction().select(match), timestamp)); + } + } +}