Repository: flink Updated Branches: refs/heads/master 3e706b13a -> b3ffd919f
[hotfix] [cep] Spelling corrections Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3ffd919 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3ffd919 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3ffd919 Branch: refs/heads/master Commit: b3ffd919fd1497fb838d36d5b3a31b2402f0de71 Parents: 3e706b1 Author: Dawid Wysakowicz <dwysakow...@apache.org> Authored: Thu Aug 24 08:18:58 2017 +0200 Committer: Dawid Wysakowicz <dwysakow...@apache.org> Committed: Thu Aug 24 08:18:58 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/cep/PatternStream.java | 26 +++++++++++--------- .../main/java/org/apache/flink/cep/nfa/NFA.java | 4 +-- .../AbstractKeyedCEPPatternOperator.java | 8 +++--- .../flink/cep/operator/CEPOperatorUtils.java | 20 +++++++-------- .../cep/operator/FlatSelectCepOperator.java | 4 +-- .../operator/FlatSelectTimeoutCepOperator.java | 16 ++++++------ .../flink/cep/operator/SelectCepOperator.java | 4 +-- .../cep/operator/SelectTimeoutCepOperator.java | 18 +++++++------- .../flink/cep/operator/CEPOperatorTest.java | 8 +++--- 9 files changed, 55 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java index 6380375..79ca736 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.types.Either; import org.apache.flink.util.OutputTag; +import java.util.UUID; + /** * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected * pattern sequences as a map of events associated with their names. The pattern is detected using a @@ -145,7 +147,7 @@ public class PatternStream<T> { * {@link SingleOutputStreamOperator} resulting from the select operation * with the same {@link OutputTag}. * - * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns + * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns * @param patternTimeoutFunction The pattern timeout function which is called for each partial * pattern sequence which has timed out. * @param patternSelectFunction The pattern select function which is called for each detected @@ -192,7 +194,7 @@ public class PatternStream<T> { * {@link SingleOutputStreamOperator} resulting from the select operation * with the same {@link OutputTag}. * - * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns + * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns * @param patternTimeoutFunction The pattern timeout function which is called for each partial * pattern sequence which has timed out. * @param outTypeInfo Explicit specification of output type. @@ -235,7 +237,7 @@ public class PatternStream<T> { * @param <R> Type of the resulting elements * * @deprecated Use {@link PatternStream#select(OutputTag, PatternTimeoutFunction, PatternSelectFunction)} - * that returns timeouted events as a side-output + * that returns timed out events as a side-output * * @return {@link DataStream} which contains the resulting elements or the resulting timeout * elements wrapped in an {@link Either} type. @@ -267,7 +269,7 @@ public class PatternStream<T> { null, false); - final OutputTag<L> outputTag = new OutputTag<L>("dummy-timeouted", leftTypeInfo); + final OutputTag<L> outputTag = new OutputTag<L>(UUID.randomUUID().toString(), leftTypeInfo); final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream( inputStream, @@ -278,11 +280,11 @@ public class PatternStream<T> { outputTag, clean(patternTimeoutFunction)); - final DataStream<L> timeoutedStream = mainStream.getSideOutput(outputTag); + final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag); TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo); - return mainStream.connect(timeoutedStream).map(new CoMapTimeout<>()).returns(outTypeInfo); + return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo); } /** @@ -350,7 +352,7 @@ public class PatternStream<T> { * {@link SingleOutputStreamOperator} resulting from the select operation * with the same {@link OutputTag}. * - * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns + * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns * @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial * pattern sequence which has timed out. * @param patternFlatSelectFunction The pattern select function which is called for each detected @@ -393,7 +395,7 @@ public class PatternStream<T> { * {@link SingleOutputStreamOperator} resulting from the select operation * with the same {@link OutputTag}. * - * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns + * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns * @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial * pattern sequence which has timed out. * @param patternFlatSelectFunction The pattern select function which is called for each detected @@ -437,7 +439,7 @@ public class PatternStream<T> { * @param <R> Type of the resulting events * * @deprecated Use {@link PatternStream#flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)} - * that returns timeouted events as a side-output + * that returns timed out events as a side-output * * @return {@link DataStream} which contains the resulting events from the pattern flat select * function or the resulting timeout events from the pattern flat timeout function wrapped in an @@ -470,7 +472,7 @@ public class PatternStream<T> { null, false); - final OutputTag<L> outputTag = new OutputTag<L>("dummy-timeouted", leftTypeInfo); + final OutputTag<L> outputTag = new OutputTag<L>(UUID.randomUUID().toString(), leftTypeInfo); final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream( inputStream, @@ -481,11 +483,11 @@ public class PatternStream<T> { outputTag, clean(patternFlatTimeoutFunction)); - final DataStream<L> timeoutedStream = mainStream.getSideOutput(outputTag); + final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag); TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo); - return mainStream.connect(timeoutedStream).map(new CoMapTimeout<>()).returns(outTypeInfo); + return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 3a1f621..11f14b9 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -249,8 +249,8 @@ public class NFA<T> implements Serializable { if (handleTimeout) { // extract the timed out event pattern - Map<String, List<T>> timedoutPattern = extractCurrentMatches(computationState); - timeoutResult.add(Tuple2.of(timedoutPattern, timestamp)); + Map<String, List<T>> timedOutPattern = extractCurrentMatches(computationState); + timeoutResult.add(Tuple2.of(timedOutPattern, timestamp)); } eventSharedBuffer.release( http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 4c67e9d..7556d9f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -80,7 +80,7 @@ import java.util.stream.StreamSupport; * @param <IN> Type of the input elements * @param <KEY> Type of the key on which the input stream is keyed * @param <OUT> Type of the output elements - * @param <F> user function that can be applied to matching sequences or timeouted sequences + * @param <F> user function that can be applied to matching sequences or timed out sequences */ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Function> extends AbstractUdfStreamOperator<OUT, F> @@ -359,7 +359,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu try { processMatchedSequences(patterns.f0, timestamp); - processTimeoutedSequence(patterns.f1, timestamp); + processTimedOutSequences(patterns.f1, timestamp); } catch (Exception e) { //rethrow as Runtime, to be able to use processEvent in Stream. throw new RuntimeException(e); @@ -377,9 +377,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu processEvent(nfa, null, timestamp); } - protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception; + protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception; - protected void processTimeoutedSequence( + protected void processTimedOutSequences( Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception { } http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index a662faf..cef11e2 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -143,19 +143,19 @@ public class CEPOperatorUtils { /** * Creates a data stream containing results of {@link PatternFlatSelectFunction} to fully matching event patterns and - * also timeouted partially matched with applied {@link PatternFlatTimeoutFunction} as a sideoutput. + * also timed out partially matched with applied {@link PatternFlatTimeoutFunction} as a sideoutput. * * @param inputStream stream of input events * @param pattern pattern to be search for in the stream * @param selectFunction function to be applied to matching event sequences * @param outTypeInfo output TypeInformation of selectFunction - * @param outputTag {@link OutputTag} for a side-output with timeouted matches - * @param timeoutFunction function to be applied to timeouted event sequences + * @param outputTag {@link OutputTag} for a side-output with timed out matches + * @param timeoutFunction function to be applied to timed out event sequences * @param <IN> type of input events * @param <OUT1> type of fully matched events - * @param <OUT2> type of timeouted events + * @param <OUT2> type of timed out events * @return Data stream containing fully matched event sequence with applied {@link PatternFlatSelectFunction} that - * contains timeouted patterns with applied {@link PatternFlatTimeoutFunction} as side-output + * contains timed out patterns with applied {@link PatternFlatTimeoutFunction} as side-output */ public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream( final DataStream<IN> inputStream, @@ -201,19 +201,19 @@ public class CEPOperatorUtils { /** * Creates a data stream containing results of {@link PatternSelectFunction} to fully matching event patterns and - * also timeouted partially matched with applied {@link PatternTimeoutFunction} as a sideoutput. + * also timed out partially matched with applied {@link PatternTimeoutFunction} as a sideoutput. * * @param inputStream stream of input events * @param pattern pattern to be search for in the stream * @param selectFunction function to be applied to matching event sequences * @param outTypeInfo output TypeInformation of selectFunction - * @param outputTag {@link OutputTag} for a side-output with timeouted matches - * @param timeoutFunction function to be applied to timeouted event sequences + * @param outputTag {@link OutputTag} for a side-output with timed out matches + * @param timeoutFunction function to be applied to timed out event sequences * @param <IN> type of input events * @param <OUT1> type of fully matched events - * @param <OUT2> type of timeouted events + * @param <OUT2> type of timed out events * @return Data stream containing fully matched event sequence with applied {@link PatternSelectFunction} that - * contains timeouted patterns with applied {@link PatternTimeoutFunction} as side-output + * contains timed out patterns with applied {@link PatternTimeoutFunction} as side-output */ public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream( final DataStream<IN> inputStream, http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java index d44794e..192a38b 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java @@ -58,8 +58,8 @@ public class FlatSelectCepOperator<IN, KEY, OUT> } @Override - protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception { - for (Map<String, List<IN>> match : matchesSequence) { + protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception { + for (Map<String, List<IN>> match : matchingSequences) { collector.setAbsoluteTimestamp(timestamp); getUserFunction().flatSelect(match, collector); } http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java index d46761b..58a9d53 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java @@ -35,13 +35,13 @@ import java.util.Map; /** * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternFlatSelectFunction} to fully - * matched event patterns and {@link PatternFlatTimeoutFunction} to timeouted ones. The timeouted elements are returned + * matched event patterns and {@link PatternFlatTimeoutFunction} to timed out ones. The timed out elements are returned * as a side-output. * * @param <IN> Type of the input elements * @param <KEY> Type of the key on which the input stream is keyed * @param <OUT1> Type of the output elements - * @param <OUT2> Type of the timeouted output elements + * @param <OUT2> Type of the timed out output elements */ public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, FlatSelectTimeoutCepOperator.FlatSelectWrapper<IN, OUT1, OUT2>> { @@ -50,7 +50,7 @@ public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends private transient TimestampedSideOutputCollector<OUT2> sideOutputCollector; - private OutputTag<OUT2> timeoutedOutputTag; + private OutputTag<OUT2> timedOutOutputTag; public FlatSelectTimeoutCepOperator( TypeSerializer<IN> inputSerializer, @@ -70,27 +70,27 @@ public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends migratingFromOldKeyedOperator, comparator, new FlatSelectWrapper<>(flatSelectFunction, flatTimeoutFunction)); - this.timeoutedOutputTag = outputTag; + this.timedOutOutputTag = outputTag; } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); - sideOutputCollector = new TimestampedSideOutputCollector<>(timeoutedOutputTag, output); + sideOutputCollector = new TimestampedSideOutputCollector<>(timedOutOutputTag, output); } @Override protected void processMatchedSequences( - Iterable<Map<String, List<IN>>> matchesSequence, + Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception { - for (Map<String, List<IN>> match : matchesSequence) { + for (Map<String, List<IN>> match : matchingSequences) { getUserFunction().getFlatSelectFunction().flatSelect(match, collector); } } @Override - protected void processTimeoutedSequence( + protected void processTimedOutSequences( Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception { for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) { sideOutputCollector.setAbsoluteTimestamp(timestamp); http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java index d687c67..acd3cd3 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java @@ -48,8 +48,8 @@ public class SelectCepOperator<IN, KEY, OUT> } @Override - protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception { - for (Map<String, List<IN>> match : matchesSequence) { + protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception { + for (Map<String, List<IN>> match : matchingSequences) { output.collect(new StreamRecord<>(getUserFunction().select(match), timestamp)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java index b46b801..d03e25c 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java @@ -34,18 +34,18 @@ import java.util.Map; /** * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternSelectFunction} to fully - * matched event patterns and {@link PatternTimeoutFunction} to timeouted ones. The timeouted elements are returned + * matched event patterns and {@link PatternTimeoutFunction} to timed out ones. The timed out elements are returned * as a side-output. * * @param <IN> Type of the input elements * @param <KEY> Type of the key on which the input stream is keyed * @param <OUT1> Type of the output elements - * @param <OUT2> Type of the timeouted output elements + * @param <OUT2> Type of the timed out output elements */ public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, SelectTimeoutCepOperator.SelectWrapper<IN, OUT1, OUT2>> { - private OutputTag<OUT2> timeoutedOutputTag; + private OutputTag<OUT2> timedOutOutputTag; public SelectTimeoutCepOperator( TypeSerializer<IN> inputSerializer, @@ -65,21 +65,21 @@ public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> migratingFromOldKeyedOperator, comparator, new SelectWrapper<>(flatSelectFunction, flatTimeoutFunction)); - this.timeoutedOutputTag = outputTag; + this.timedOutOutputTag = outputTag; } @Override - protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception { - for (Map<String, List<IN>> match : matchesSequence) { + protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception { + for (Map<String, List<IN>> match : matchingSequences) { output.collect(new StreamRecord<>(getUserFunction().getFlatSelectFunction().select(match), timestamp)); } } @Override - protected void processTimeoutedSequence( + protected void processTimedOutSequences( Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception { for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) { - output.collect(timeoutedOutputTag, + output.collect(timedOutOutputTag, new StreamRecord<>( getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1), timestamp)); @@ -91,7 +91,7 @@ public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> * * @param <IN> Type of the input elements * @param <OUT1> Type of the output elements - * @param <OUT2> Type of the timeouted output elements + * @param <OUT2> Type of the timed out output elements */ @Internal public static class SelectWrapper<IN, OUT1, OUT2> implements Function { http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 14cdd53..a2ac124 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -249,8 +249,8 @@ public class CEPOperatorTest extends TestLogger { final Map<String, List<Event>> expectedSequence = new HashMap<>(2); expectedSequence.put("start", Collections.<Event>singletonList(startEvent)); - final OutputTag<Tuple2<Map<String, List<Event>>, Long>> timeouted = - new OutputTag<Tuple2<Map<String, List<Event>>, Long>>("timeouted") {}; + final OutputTag<Tuple2<Map<String, List<Event>>, Long>> timedOut = + new OutputTag<Tuple2<Map<String, List<Event>>, Long>>("timedOut") {}; final KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new SelectTimeoutCepOperator<>( @@ -274,7 +274,7 @@ public class CEPOperatorTest extends TestLogger { return Tuple2.of(pattern, timeoutTimestamp); } }, - timeouted + timedOut ), new KeySelector<Event, Integer>() { @Override public Integer getKey(Event value) throws Exception { @@ -294,7 +294,7 @@ public class CEPOperatorTest extends TestLogger { harness.processWatermark(new Watermark(watermarkTimestamp2)); Queue<Object> result = harness.getOutput(); - Queue<StreamRecord<Tuple2<Map<String, List<Event>>, Long>>> sideOutput = harness.getSideOutput(timeouted); + Queue<StreamRecord<Tuple2<Map<String, List<Event>>, Long>>> sideOutput = harness.getSideOutput(timedOut); assertEquals(2L, result.size()); assertEquals(1L, sideOutput.size());