Repository: flink Updated Branches: refs/heads/master 9995588c8 -> 6ed5815e8
http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java new file mode 100644 index 0000000..b46b801 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.EventComparator; +import org.apache.flink.cep.PatternSelectFunction; +import org.apache.flink.cep.PatternTimeoutFunction; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; + +import java.util.List; +import java.util.Map; + +/** + * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternSelectFunction} to fully + * matched event patterns and {@link PatternTimeoutFunction} to timeouted ones. The timeouted elements are returned + * as a side-output. + * + * @param <IN> Type of the input elements + * @param <KEY> Type of the key on which the input stream is keyed + * @param <OUT1> Type of the output elements + * @param <OUT2> Type of the timeouted output elements + */ +public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> + extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, SelectTimeoutCepOperator.SelectWrapper<IN, OUT1, OUT2>> { + + private OutputTag<OUT2> timeoutedOutputTag; + + public SelectTimeoutCepOperator( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory, + boolean migratingFromOldKeyedOperator, + final EventComparator<IN> comparator, + PatternSelectFunction<IN, OUT1> flatSelectFunction, + PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction, + OutputTag<OUT2> outputTag) { + super( + inputSerializer, + isProcessingTime, + keySerializer, + nfaFactory, + migratingFromOldKeyedOperator, + comparator, + new SelectWrapper<>(flatSelectFunction, flatTimeoutFunction)); + this.timeoutedOutputTag = outputTag; + } + + @Override + protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception { + for (Map<String, List<IN>> match : matchesSequence) { + output.collect(new StreamRecord<>(getUserFunction().getFlatSelectFunction().select(match), timestamp)); + } + } + + @Override + protected void processTimeoutedSequence( + Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception { + for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) { + output.collect(timeoutedOutputTag, + new StreamRecord<>( + getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1), + timestamp)); + } + } + + /** + * Wrapper that enables storing {@link PatternSelectFunction} and {@link PatternTimeoutFunction} in one udf. + * + * @param <IN> Type of the input elements + * @param <OUT1> Type of the output elements + * @param <OUT2> Type of the timeouted output elements + */ + @Internal + public static class SelectWrapper<IN, OUT1, OUT2> implements Function { + + private static final long serialVersionUID = -8320546120157150202L; + + private PatternSelectFunction<IN, OUT1> flatSelectFunction; + private PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction; + + PatternSelectFunction<IN, OUT1> getFlatSelectFunction() { + return flatSelectFunction; + } + + PatternTimeoutFunction<IN, OUT2> getFlatTimeoutFunction() { + return flatTimeoutFunction; + } + + public SelectWrapper( + PatternSelectFunction<IN, OUT1> flatSelectFunction, + PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction) { + this.flatSelectFunction = flatSelectFunction; + this.flatTimeoutFunction = flatTimeoutFunction; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java deleted file mode 100644 index ca58955..0000000 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.operator; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.cep.EventComparator; -import org.apache.flink.cep.nfa.NFA; -import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.types.Either; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -/** - * CEP pattern operator which returns fully and partially matched (timed-out) event patterns stored in a - * {@link Map}. The events are indexed by the event names associated in the pattern specification. The - * operator works on keyed input data. - * - * @param <IN> Type of the input events - * @param <KEY> Type of the key - */ -public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Either<Tuple2<Map<String, List<IN>>, Long>, Map<String, List<IN>>>> { - private static final long serialVersionUID = 3570542177814518158L; - - public TimeoutKeyedCEPPatternOperator( - TypeSerializer<IN> inputSerializer, - boolean isProcessingTime, - TypeSerializer<KEY> keySerializer, - NFACompiler.NFAFactory<IN> nfaFactory, - boolean migratingFromOldKeyedOperator, - EventComparator<IN> comparator) { - - super(inputSerializer, isProcessingTime, keySerializer, nfaFactory, migratingFromOldKeyedOperator, comparator); - } - - @Override - protected void processEvent(NFA<IN> nfa, IN event, long timestamp) { - Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns = - nfa.process(event, timestamp); - - emitMatchedSequences(patterns.f0, timestamp); - emitTimedOutSequences(patterns.f1, timestamp); - } - - @Override - protected void advanceTime(NFA<IN> nfa, long timestamp) { - Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns = - nfa.process(null, timestamp); - - emitMatchedSequences(patterns.f0, timestamp); - emitTimedOutSequences(patterns.f1, timestamp); - } - - private void emitTimedOutSequences(Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) { - StreamRecord<Either<Tuple2<Map<String, List<IN>>, Long>, Map<String, List<IN>>>> streamRecord = - new StreamRecord<>(null, timestamp); - - for (Tuple2<Map<String, List<IN>>, Long> partialPattern: timedOutSequences) { - streamRecord.replace(Either.Left(partialPattern)); - output.collect(streamRecord); - } - } - - protected void emitMatchedSequences(Iterable<Map<String, List<IN>>> matchedSequences, long timestamp) { - StreamRecord<Either<Tuple2<Map<String, List<IN>>, Long>, Map<String, List<IN>>>> streamRecord = - new StreamRecord<>(null, timestamp); - - for (Map<String, List<IN>> matchedPattern : matchedSequences) { - streamRecord.replace(Either.Right(matchedPattern)); - output.collect(streamRecord); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java new file mode 100644 index 0000000..5336543 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +/** + * Wrapper around an {@link Output} for user functions that expect a {@link Collector}. + * Before giving the {@link TimestampedSideOutputCollector} to a user function you must set + * the timestamp that should be attached to emitted elements. Most operators + * would set the timestamp of the incoming + * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here. + * + * <p>This version emits results into a SideOutput specified by given {@link OutputTag} + * + * @param <T> The type of the elements that can be emitted. + */ +@Internal +public class TimestampedSideOutputCollector<T> implements Collector<T> { + + private final Output<?> output; + + private final StreamRecord<T> reuse; + + private final OutputTag<T> outputTag; + + /** + * Creates a new {@link TimestampedSideOutputCollector} that wraps the given {@link Output} and collects + * results into sideoutput corresponding to {@link OutputTag}. + */ + public TimestampedSideOutputCollector(OutputTag<T> outputTag, Output<?> output) { + this.output = output; + this.outputTag = outputTag; + this.reuse = new StreamRecord<T>(null); + } + + @Override + public void collect(T record) { + output.collect(outputTag, reuse.replace(record)); + } + + public void setTimestamp(StreamRecord<?> timestampBase) { + if (timestampBase.hasTimestamp()) { + reuse.setTimestamp(timestampBase.getTimestamp()); + } else { + reuse.eraseTimestamp(); + } + } + + public void setAbsoluteTimestamp(long timestamp) { + reuse.setTimestamp(timestamp); + } + + public void eraseTimestamp() { + reuse.eraseTimestamp(); + } + + @Override + public void close() { + output.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 95987c2..843d668 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -20,8 +20,6 @@ package org.apache.flink.cep.operator; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.ByteSerializer; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.NullByteKeySelector; import org.apache.flink.cep.Event; import org.apache.flink.cep.SubEvent; @@ -63,15 +61,6 @@ public class CEPMigration11to13Test { @Test public void testKeyedCEPOperatorMigratation() throws Exception { - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - final Event startEvent = new Event(42, "start", 1.0); final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); final Event endEvent = new Event(42, "end", 1.0); @@ -79,7 +68,7 @@ public class CEPMigration11to13Test { // uncomment these lines for regenerating the snapshot on Flink 1.1 /* OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( + new KeyedCepOperator<>( Event.createTypeSerializer(), false, keySelector, @@ -104,17 +93,8 @@ public class CEPMigration11to13Test { harness.close(); */ - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = - new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - null), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness( + CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory())); try { harness.setup(); @@ -158,16 +138,9 @@ public class CEPMigration11to13Test { OperatorStateHandles snapshot = harness.snapshot(1L, 1L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - null), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + harness = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator( + false, + new NFAFactory())); harness.setup(); harness.initializeState(snapshot); @@ -234,16 +207,10 @@ public class CEPMigration11to13Test { NullByteKeySelector keySelector = new NullByteKeySelector(); OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = - new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - ByteSerializer.INSTANCE, - new NFAFactory(), - false, - null), - keySelector, - BasicTypeInfo.BYTE_TYPE_INFO); + new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>( + CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), ByteSerializer.INSTANCE, false, null), + keySelector, + BasicTypeInfo.BYTE_TYPE_INFO); try { harness.setup(); @@ -288,13 +255,7 @@ public class CEPMigration11to13Test { harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - ByteSerializer.INSTANCE, - new NFAFactory(), - false, - null), + CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), ByteSerializer.INSTANCE), keySelector, BasicTypeInfo.BYTE_TYPE_INFO); http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java index 0eeff09..cf3c921 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java @@ -19,7 +19,6 @@ package org.apache.flink.cep.operator; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.Event; import org.apache.flink.cep.SubEvent; @@ -48,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; +import static org.apache.flink.cep.operator.CepOperatorTestUtilities.getKeyedCepOpearator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -100,13 +100,7 @@ public class CEPMigrationTest { OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - null), + getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -151,13 +145,7 @@ public class CEPMigrationTest { OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - null), + getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -221,13 +209,7 @@ public class CEPMigrationTest { harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - null), + getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -282,13 +264,7 @@ public class CEPMigrationTest { OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - null), + getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -331,13 +307,7 @@ public class CEPMigrationTest { OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - null), + getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -415,13 +385,7 @@ public class CEPMigrationTest { harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - null), + getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -475,13 +439,7 @@ public class CEPMigrationTest { OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new SinglePatternNFAFactory(), - true, - null), + getKeyedCepOpearator(false, new SinglePatternNFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -515,13 +473,7 @@ public class CEPMigrationTest { OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new SinglePatternNFAFactory(), - true, - null), + getKeyedCepOpearator(false, new SinglePatternNFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO); http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 8cf67ad..14cdd53 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -26,6 +26,8 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.cep.Event; +import org.apache.flink.cep.PatternSelectFunction; +import org.apache.flink.cep.PatternTimeoutFunction; import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; @@ -40,7 +42,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.types.Either; +import org.apache.flink.util.OutputTag; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -66,7 +68,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.validateMockitoUsage; /** - * Tests for {@link KeyedCEPPatternOperator} and {@link TimeoutKeyedCEPPatternOperator}. + * Tests for {@link AbstractKeyedCEPPatternOperator}. */ public class CEPOperatorTest extends TestLogger { @@ -240,8 +242,6 @@ public class CEPOperatorTest extends TestLogger { */ @Test public void testKeyedAdvancingTimeWithoutElements() throws Exception { - final KeySelector<Event, Integer> keySelector = new TestKeySelector(); - final Event startEvent = new Event(42, "start", 1.0); final long watermarkTimestamp1 = 5L; final long watermarkTimestamp2 = 13L; @@ -249,21 +249,43 @@ public class CEPOperatorTest extends TestLogger { final Map<String, List<Event>> expectedSequence = new HashMap<>(2); expectedSequence.put("start", Collections.<Event>singletonList(startEvent)); - OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new TimeoutKeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(true), - true, - null), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + final OutputTag<Tuple2<Map<String, List<Event>>, Long>> timeouted = + new OutputTag<Tuple2<Map<String, List<Event>>, Long>>("timeouted") {}; + final KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new SelectTimeoutCepOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(true), + true, + null, + new PatternSelectFunction<Event, Map<String, List<Event>>>() { + @Override + public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception { + return pattern; + } + }, + new PatternTimeoutFunction<Event, Tuple2<Map<String, List<Event>>, Long>>() { + @Override + public Tuple2<Map<String, List<Event>>, Long> timeout( + Map<String, List<Event>> pattern, + long timeoutTimestamp) throws Exception { + return Tuple2.of(pattern, timeoutTimestamp); + } + }, + timeouted + ), new KeySelector<Event, Integer>() { + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }, BasicTypeInfo.INT_TYPE_INFO); try { harness.setup( new KryoSerializer<>( - (Class<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>>) (Object) Either.class, + (Class<Map<String, List<Event>>>) (Object) Map.class, new ExecutionConfig())); harness.open(); @@ -272,8 +294,10 @@ public class CEPOperatorTest extends TestLogger { harness.processWatermark(new Watermark(watermarkTimestamp2)); Queue<Object> result = harness.getOutput(); + Queue<StreamRecord<Tuple2<Map<String, List<Event>>, Long>>> sideOutput = harness.getSideOutput(timeouted); - assertEquals(3L, result.size()); + assertEquals(2L, result.size()); + assertEquals(1L, sideOutput.size()); Object watermark1 = result.poll(); @@ -281,19 +305,7 @@ public class CEPOperatorTest extends TestLogger { assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp()); - Object resultObject = result.poll(); - - assertTrue(resultObject instanceof StreamRecord); - - StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> streamRecord = - (StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>>) resultObject; - - assertTrue(streamRecord.getValue() instanceof Either.Left); - - Either.Left<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>> left = - (Either.Left<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>) streamRecord.getValue(); - - Tuple2<Map<String, List<Event>>, Long> leftResult = left.left(); + Tuple2<Map<String, List<Event>>, Long> leftResult = sideOutput.poll().getValue(); assertEquals(watermarkTimestamp2, (long) leftResult.f1); assertEquals(expectedSequence, leftResult.f0); @@ -310,14 +322,12 @@ public class CEPOperatorTest extends TestLogger { @Test public void testKeyedCEPOperatorNFAUpdate() throws Exception { - KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - true, - IntSerializer.INSTANCE, - new SimpleNFAFactory(), + + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator( true, - null); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + new SimpleNFAFactory()); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness( + operator); try { harness.open(); @@ -332,14 +342,8 @@ public class CEPOperatorTest extends TestLogger { OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - true, - IntSerializer.INSTANCE, - new SimpleNFAFactory(), - true, - null); - harness = getCepTestHarness(operator); + operator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()); + harness = CepOperatorTestUtilities.getCepTestHarness(operator); harness.setup(); harness.initializeState(snapshot); @@ -349,14 +353,8 @@ public class CEPOperatorTest extends TestLogger { OperatorStateHandles snapshot2 = harness.snapshot(0L, 0L); harness.close(); - operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - true, - IntSerializer.INSTANCE, - new SimpleNFAFactory(), - true, - null); - harness = getCepTestHarness(operator); + operator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()); + harness = CepOperatorTestUtilities.getCepTestHarness(operator); harness.setup(); harness.initializeState(snapshot2); @@ -384,14 +382,11 @@ public class CEPOperatorTest extends TestLogger { RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); - KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator( true, - IntSerializer.INSTANCE, - new SimpleNFAFactory(), - true, - null); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + new SimpleNFAFactory()); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness( + operator); try { harness.setStateBackend(rocksDBStateBackend); @@ -408,14 +403,8 @@ public class CEPOperatorTest extends TestLogger { OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - true, - IntSerializer.INSTANCE, - new SimpleNFAFactory(), - true, - null); - harness = getCepTestHarness(operator); + operator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()); + harness = CepOperatorTestUtilities.getCepTestHarness(operator); rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); @@ -428,14 +417,8 @@ public class CEPOperatorTest extends TestLogger { OperatorStateHandles snapshot2 = harness.snapshot(0L, 0L); harness.close(); - operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - true, - IntSerializer.INSTANCE, - new SimpleNFAFactory(), - true, - null); - harness = getCepTestHarness(operator); + operator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()); + harness = CepOperatorTestUtilities.getCepTestHarness(operator); rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); @@ -461,14 +444,10 @@ public class CEPOperatorTest extends TestLogger { @Test public void testKeyedCEPOperatorNFAUpdateTimes() throws Exception { - KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - true, - IntSerializer.INSTANCE, - new SimpleNFAFactory(), + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator( true, - null); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + new SimpleNFAFactory()); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator); try { harness.open(); @@ -507,14 +486,11 @@ public class CEPOperatorTest extends TestLogger { RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); - KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - true, - IntSerializer.INSTANCE, - new SimpleNFAFactory(), + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator( true, - null); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + new SimpleNFAFactory()); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness( + operator); try { harness.setStateBackend(rocksDBStateBackend); @@ -561,8 +537,8 @@ public class CEPOperatorTest extends TestLogger { Event startEventK2 = new Event(43, "start", 1.0); - KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOperator(false); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(false); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator); try { harness.open(); @@ -606,8 +582,8 @@ public class CEPOperatorTest extends TestLogger { OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOperator(false); - harness = getCepTestHarness(operator2); + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(false); + harness = CepOperatorTestUtilities.getCepTestHarness(operator2); harness.setup(); harness.initializeState(snapshot); harness.open(); @@ -657,14 +633,10 @@ public class CEPOperatorTest extends TestLogger { Event middle1Event3 = new Event(41, "a", 4.0); Event middle2Event1 = new Event(41, "b", 5.0); - KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new ComplexNFAFactory(), - true, - null); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator( + false, + new ComplexNFAFactory()); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator); try { harness.open(); @@ -756,8 +728,8 @@ public class CEPOperatorTest extends TestLogger { Event startEventK2 = new Event(43, "start", 1.0); - KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOperator(true); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(true); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator); try { harness.open(); @@ -784,8 +756,8 @@ public class CEPOperatorTest extends TestLogger { OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOperator(true); - harness = getCepTestHarness(operator2); + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(true); + harness = CepOperatorTestUtilities.getCepTestHarness(operator2); harness.setup(); harness.initializeState(snapshot); harness.open(); @@ -876,23 +848,18 @@ public class CEPOperatorTest extends TestLogger { } }); - KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFACompiler.NFAFactory<Event>() { + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator( + false, + new NFACompiler.NFAFactory<Event>() { + private static final long serialVersionUID = 477082663248051994L; - private static final long serialVersionUID = 477082663248051994L; - - @Override - public NFA<Event> createNFA() { - return NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - } - }, - true, - null); + @Override + public NFA<Event> createNFA() { + return NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + } + }); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator); try { harness.setStateBackend(rocksDBStateBackend); @@ -952,8 +919,8 @@ public class CEPOperatorTest extends TestLogger { Event startEventK2 = new Event(43, "start", 1.0); - KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOperatorWithComparator(true); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(true); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator); try { harness.open(); @@ -980,8 +947,8 @@ public class CEPOperatorTest extends TestLogger { OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOperatorWithComparator(true); - harness = getCepTestHarness(operator2); + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(true); + harness = CepOperatorTestUtilities.getCepTestHarness(operator2); harness.setup(); harness.initializeState(snapshot); harness.open(); @@ -1009,8 +976,8 @@ public class CEPOperatorTest extends TestLogger { Event startEventK2 = new Event(43, "start", 1.0); - KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOperatorWithComparator(false); - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(false); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator); try { harness.open(); @@ -1041,8 +1008,8 @@ public class CEPOperatorTest extends TestLogger { OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOperatorWithComparator(false); - harness = getCepTestHarness(operator2); + SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(false); + harness = CepOperatorTestUtilities.getCepTestHarness(operator2); harness.setup(); harness.initializeState(snapshot); harness.open(); @@ -1078,52 +1045,20 @@ public class CEPOperatorTest extends TestLogger { assertEquals(end, patternMap.get("end").get(0)); } - private OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness(boolean isProcessingTime) throws Exception { - KeySelector<Event, Integer> keySelector = new TestKeySelector(); - - return new KeyedOneInputStreamOperatorTestHarness<>( - getKeyedCepOperator(isProcessingTime), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - } - - private OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness( - KeyedCEPPatternOperator<Event, Integer> cepOperator) throws Exception { - KeySelector<Event, Integer> keySelector = new TestKeySelector(); - - return new KeyedOneInputStreamOperatorTestHarness<>( - cepOperator, - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - } - - private KeyedCEPPatternOperator<Event, Integer> getKeyedCepOperator( + private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperator( boolean isProcessingTime) { - - return new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - isProcessingTime, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - null); + return CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new NFAFactory()); } - private KeyedCEPPatternOperator<Event, Integer> getKeyedCepOperatorWithComparator( + private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperatorWithComparator( boolean isProcessingTime) { - return new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - isProcessingTime, - IntSerializer.INSTANCE, - new NFAFactory(), - true, - new org.apache.flink.cep.EventComparator<Event>() { - @Override - public int compare(Event o1, Event o2) { - return Double.compare(o1.getPrice(), o2.getPrice()); - } - }); + return CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new NFAFactory(), new org.apache.flink.cep.EventComparator<Event>() { + @Override + public int compare(Event o1, Event o2) { + return Double.compare(o1.getPrice(), o2.getPrice()); + } + }); } private void compareMaps(List<List<Event>> actual, List<List<Event>> expected) { @@ -1181,14 +1116,12 @@ public class CEPOperatorTest extends TestLogger { } } - private static class TestKeySelector implements KeySelector<Event, Integer> { - - private static final long serialVersionUID = -4873366487571254798L; + private OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness(boolean isProcessingTime) throws Exception { + return CepOperatorTestUtilities.getCepTestHarness(getKeyedCepOpearator(isProcessingTime)); + } - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } + private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOpearator(boolean isProcessingTime) { + return CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new CEPOperatorTest.NFAFactory()); } private static class NFAFactory implements NFACompiler.NFAFactory<Event> { http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index 9fc4337..1690284 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import static org.apache.flink.cep.operator.CepOperatorTestUtilities.getKeyedCepOpearator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -372,13 +373,10 @@ public class CEPRescalingTest { KeySelector<Event, Integer> keySelector = new TestKeySelector(); return new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), + getKeyedCepOpearator( false, - BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()), new NFAFactory(), - true, - null), + BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())), keySelector, BasicTypeInfo.INT_TYPE_INFO, maxParallelism, http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java new file mode 100644 index 0000000..17d6656 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.EventComparator; +import org.apache.flink.cep.PatternSelectFunction; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import java.util.List; +import java.util.Map; + +/** + * Utility methods for creating test {@link AbstractKeyedCEPPatternOperator}. + */ +public class CepOperatorTestUtilities { + + private static class TestKeySelector implements KeySelector<Event, Integer> { + + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + } + + public static OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness( + SelectCepOperator<Event, Integer, Map<String, List<Event>>> cepOperator) throws Exception { + KeySelector<Event, Integer> keySelector = new TestKeySelector(); + + return new KeyedOneInputStreamOperatorTestHarness<>( + cepOperator, + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } + + public static SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOpearator( + boolean isProcessingTime, + NFACompiler.NFAFactory<Event> nfaFactory) { + return getKeyedCepOpearator(isProcessingTime, nfaFactory, IntSerializer.INSTANCE, null); + } + + public static SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOpearator( + boolean isProcessingTime, + NFACompiler.NFAFactory<Event> nfaFactory, + EventComparator<Event> comparator) { + return getKeyedCepOpearator(isProcessingTime, nfaFactory, IntSerializer.INSTANCE, comparator); + } + + public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator( + boolean isProcessingTime, + NFACompiler.NFAFactory<Event> nfaFactory, + TypeSerializer<K> keySerializer, + EventComparator<Event> comparator) { + + return getKeyedCepOpearator(isProcessingTime, nfaFactory, keySerializer, true, comparator); + } + + public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator( + boolean isProcessingTime, + NFACompiler.NFAFactory<Event> nfaFactory, + TypeSerializer<K> keySerializer) { + + return getKeyedCepOpearator(isProcessingTime, nfaFactory, keySerializer, true, null); + } + + public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator( + boolean isProcessingTime, + NFACompiler.NFAFactory<Event> nfaFactory, + TypeSerializer<K> keySerializer, + boolean migratingFromOldKeyedOperator, + EventComparator<Event> comparator) { + return new SelectCepOperator<>( + Event.createTypeSerializer(), + isProcessingTime, + keySerializer, + nfaFactory, + migratingFromOldKeyedOperator, + comparator, + new PatternSelectFunction<Event, Map<String, List<Event>>>() { + @Override + public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception { + return pattern; + } + }); + } + + private CepOperatorTestUtilities() { + } +}