Repository: flink
Updated Branches:
  refs/heads/master 9995588c8 -> 6ed5815e8


http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
new file mode 100644
index 0000000..b46b801
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.PatternTimeoutFunction;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Version of {@link AbstractKeyedCEPPatternOperator} that applies given 
{@link PatternSelectFunction} to fully
+ * matched event patterns and {@link PatternTimeoutFunction} to timeouted 
ones. The timeouted elements are returned
+ * as a side-output.
+ *
+ * @param <IN> Type of the input elements
+ * @param <KEY> Type of the key on which the input stream is keyed
+ * @param <OUT1> Type of the output elements
+ * @param <OUT2> Type of the timeouted output elements
+ */
+public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY>
+       extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, 
SelectTimeoutCepOperator.SelectWrapper<IN, OUT1, OUT2>> {
+
+       private OutputTag<OUT2> timeoutedOutputTag;
+
+       public SelectTimeoutCepOperator(
+               TypeSerializer<IN> inputSerializer,
+               boolean isProcessingTime,
+               TypeSerializer<KEY> keySerializer,
+               NFACompiler.NFAFactory<IN> nfaFactory,
+               boolean migratingFromOldKeyedOperator,
+               final EventComparator<IN> comparator,
+               PatternSelectFunction<IN, OUT1> flatSelectFunction,
+               PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction,
+               OutputTag<OUT2> outputTag) {
+               super(
+                       inputSerializer,
+                       isProcessingTime,
+                       keySerializer,
+                       nfaFactory,
+                       migratingFromOldKeyedOperator,
+                       comparator,
+                       new SelectWrapper<>(flatSelectFunction, 
flatTimeoutFunction));
+               this.timeoutedOutputTag = outputTag;
+       }
+
+       @Override
+       protected void processMatchedSequences(Iterable<Map<String, List<IN>>> 
matchesSequence, long timestamp) throws Exception {
+               for (Map<String, List<IN>> match : matchesSequence) {
+                       output.collect(new 
StreamRecord<>(getUserFunction().getFlatSelectFunction().select(match), 
timestamp));
+               }
+       }
+
+       @Override
+       protected void processTimeoutedSequence(
+               Iterable<Tuple2<Map<String, List<IN>>, Long>> 
timedOutSequences, long timestamp) throws Exception {
+               for (Tuple2<Map<String, List<IN>>, Long> match : 
timedOutSequences) {
+                       output.collect(timeoutedOutputTag,
+                               new StreamRecord<>(
+                                       
getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1),
+                                       timestamp));
+               }
+       }
+
+       /**
+        * Wrapper that enables storing {@link PatternSelectFunction} and 
{@link PatternTimeoutFunction} in one udf.
+        *
+        * @param <IN> Type of the input elements
+        * @param <OUT1> Type of the output elements
+        * @param <OUT2> Type of the timeouted output elements
+        */
+       @Internal
+       public static class SelectWrapper<IN, OUT1, OUT2> implements Function {
+
+               private static final long serialVersionUID = 
-8320546120157150202L;
+
+               private PatternSelectFunction<IN, OUT1> flatSelectFunction;
+               private PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction;
+
+               PatternSelectFunction<IN, OUT1> getFlatSelectFunction() {
+                       return flatSelectFunction;
+               }
+
+               PatternTimeoutFunction<IN, OUT2> getFlatTimeoutFunction() {
+                       return flatTimeoutFunction;
+               }
+
+               public SelectWrapper(
+                       PatternSelectFunction<IN, OUT1> flatSelectFunction,
+                       PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction) {
+                       this.flatSelectFunction = flatSelectFunction;
+                       this.flatTimeoutFunction = flatTimeoutFunction;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
deleted file mode 100644
index ca58955..0000000
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.types.Either;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * CEP pattern operator which returns fully and partially matched (timed-out) 
event patterns stored in a
- * {@link Map}. The events are indexed by the event names associated in the 
pattern specification. The
- * operator works on keyed input data.
- *
- * @param <IN> Type of the input events
- * @param <KEY> Type of the key
- */
-public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPatternOperator<IN, KEY, Either<Tuple2<Map<String, List<IN>>, 
Long>, Map<String, List<IN>>>> {
-       private static final long serialVersionUID = 3570542177814518158L;
-
-       public TimeoutKeyedCEPPatternOperator(
-                       TypeSerializer<IN> inputSerializer,
-                       boolean isProcessingTime,
-                       TypeSerializer<KEY> keySerializer,
-                       NFACompiler.NFAFactory<IN> nfaFactory,
-                       boolean migratingFromOldKeyedOperator,
-                       EventComparator<IN> comparator) {
-
-               super(inputSerializer, isProcessingTime, keySerializer, 
nfaFactory, migratingFromOldKeyedOperator, comparator);
-       }
-
-       @Override
-       protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-               Tuple2<Collection<Map<String, List<IN>>>, 
Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
-                       nfa.process(event, timestamp);
-
-               emitMatchedSequences(patterns.f0, timestamp);
-               emitTimedOutSequences(patterns.f1, timestamp);
-       }
-
-       @Override
-       protected void advanceTime(NFA<IN> nfa, long timestamp) {
-               Tuple2<Collection<Map<String, List<IN>>>, 
Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
-                       nfa.process(null, timestamp);
-
-               emitMatchedSequences(patterns.f0, timestamp);
-               emitTimedOutSequences(patterns.f1, timestamp);
-       }
-
-       private void emitTimedOutSequences(Iterable<Tuple2<Map<String, 
List<IN>>, Long>> timedOutSequences, long timestamp) {
-               StreamRecord<Either<Tuple2<Map<String, List<IN>>, Long>, 
Map<String, List<IN>>>> streamRecord =
-                       new StreamRecord<>(null, timestamp);
-
-               for (Tuple2<Map<String, List<IN>>, Long> partialPattern: 
timedOutSequences) {
-                       streamRecord.replace(Either.Left(partialPattern));
-                       output.collect(streamRecord);
-               }
-       }
-
-       protected void emitMatchedSequences(Iterable<Map<String, List<IN>>> 
matchedSequences, long timestamp) {
-               StreamRecord<Either<Tuple2<Map<String, List<IN>>, Long>, 
Map<String, List<IN>>>> streamRecord =
-                       new StreamRecord<>(null, timestamp);
-
-               for (Map<String, List<IN>> matchedPattern : matchedSequences) {
-                       streamRecord.replace(Either.Right(matchedPattern));
-                       output.collect(streamRecord);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java
new file mode 100644
index 0000000..5336543
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * Wrapper around an {@link Output} for user functions that expect a {@link 
Collector}.
+ * Before giving the {@link TimestampedSideOutputCollector} to a user function 
you must set
+ * the timestamp that should be attached to emitted elements. Most operators
+ * would set the timestamp of the incoming
+ * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
+ *
+ * <p>This version emits results into a SideOutput specified by given {@link 
OutputTag}
+ *
+ * @param <T> The type of the elements that can be emitted.
+ */
+@Internal
+public class TimestampedSideOutputCollector<T> implements Collector<T> {
+
+       private final Output<?> output;
+
+       private final StreamRecord<T> reuse;
+
+       private final OutputTag<T> outputTag;
+
+       /**
+        * Creates a new {@link TimestampedSideOutputCollector} that wraps the 
given {@link Output} and collects
+        * results into sideoutput corresponding to {@link OutputTag}.
+        */
+       public TimestampedSideOutputCollector(OutputTag<T> outputTag, Output<?> 
output) {
+               this.output = output;
+               this.outputTag = outputTag;
+               this.reuse = new StreamRecord<T>(null);
+       }
+
+       @Override
+       public void collect(T record) {
+               output.collect(outputTag, reuse.replace(record));
+       }
+
+       public void setTimestamp(StreamRecord<?> timestampBase) {
+               if (timestampBase.hasTimestamp()) {
+                       reuse.setTimestamp(timestampBase.getTimestamp());
+               } else {
+                       reuse.eraseTimestamp();
+               }
+       }
+
+       public void setAbsoluteTimestamp(long timestamp) {
+               reuse.setTimestamp(timestamp);
+       }
+
+       public void eraseTimestamp() {
+               reuse.eraseTimestamp();
+       }
+
+       @Override
+       public void close() {
+               output.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 95987c2..843d668 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -20,8 +20,6 @@ package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.NullByteKeySelector;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
@@ -63,15 +61,6 @@ public class CEPMigration11to13Test {
        @Test
        public void testKeyedCEPOperatorMigratation() throws Exception {
 
-               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
-                       private static final long serialVersionUID = 
-4873366487571254798L;
-
-                       @Override
-                       public Integer getKey(Event value) throws Exception {
-                               return value.getId();
-                       }
-               };
-
                final Event startEvent = new Event(42, "start", 1.0);
                final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
                final Event endEvent = new Event(42, "end", 1.0);
@@ -79,7 +68,7 @@ public class CEPMigration11to13Test {
                // uncomment these lines for regenerating the snapshot on Flink 
1.1
                /*
                OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new OneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
+                               new KeyedCepOperator<>(
                                                Event.createTypeSerializer(),
                                                false,
                                                keySelector,
@@ -104,17 +93,8 @@ public class CEPMigration11to13Test {
                harness.close();
                */
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
-                               new KeyedOneInputStreamOperatorTestHarness<>(
-                                               new KeyedCEPPatternOperator<>(
-                                                               
Event.createTypeSerializer(),
-                                                               false,
-                                                               
IntSerializer.INSTANCE,
-                                                               new 
NFAFactory(),
-                                                               true,
-                                                               null),
-                                               keySelector,
-                                               BasicTypeInfo.INT_TYPE_INFO);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
+                       CepOperatorTestUtilities.getKeyedCepOpearator(false, 
new NFAFactory()));
 
                try {
                        harness.setup();
@@ -158,16 +138,9 @@ public class CEPMigration11to13Test {
                        OperatorStateHandles snapshot = harness.snapshot(1L, 
1L);
                        harness.close();
 
-                       harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                       Event.createTypeSerializer(),
-                                       false,
-                                       IntSerializer.INSTANCE,
-                                       new NFAFactory(),
-                                       true,
-                                       null),
-                               keySelector,
-                               BasicTypeInfo.INT_TYPE_INFO);
+                       harness = 
CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(
+                               false,
+                               new NFAFactory()));
 
                        harness.setup();
                        harness.initializeState(snapshot);
@@ -234,16 +207,10 @@ public class CEPMigration11to13Test {
                NullByteKeySelector keySelector = new NullByteKeySelector();
 
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
-                                               new KeyedCEPPatternOperator<>(
-                                                               
Event.createTypeSerializer(),
-                                                               false,
-                                                               
ByteSerializer.INSTANCE,
-                                                               new 
NFAFactory(),
-                                                               false,
-                                                               null),
-                                               keySelector,
-                                               BasicTypeInfo.BYTE_TYPE_INFO);
+                       new KeyedOneInputStreamOperatorTestHarness<Byte, Event, 
Map<String, List<Event>>>(
+                               
CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), 
ByteSerializer.INSTANCE, false, null),
+                               keySelector,
+                               BasicTypeInfo.BYTE_TYPE_INFO);
 
                try {
                        harness.setup();
@@ -288,13 +255,7 @@ public class CEPMigration11to13Test {
                        harness.close();
 
                        harness = new 
KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
-                               new KeyedCEPPatternOperator<>(
-                                       Event.createTypeSerializer(),
-                                       false,
-                                       ByteSerializer.INSTANCE,
-                                       new NFAFactory(),
-                                       false,
-                                       null),
+                               
CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory(), 
ByteSerializer.INSTANCE),
                                keySelector,
                                BasicTypeInfo.BYTE_TYPE_INFO);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index 0eeff09..cf3c921 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
@@ -48,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static 
org.apache.flink.cep.operator.CepOperatorTestUtilities.getKeyedCepOpearator;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -100,13 +100,7 @@ public class CEPMigrationTest {
 
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
-                                               new KeyedCEPPatternOperator<>(
-                                                               
Event.createTypeSerializer(),
-                                                               false,
-                                                               
IntSerializer.INSTANCE,
-                                                               new 
NFAFactory(),
-                                                               true,
-                                                               null),
+                                       getKeyedCepOpearator(false, new 
NFAFactory()),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
 
@@ -151,13 +145,7 @@ public class CEPMigrationTest {
 
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
-                                               new KeyedCEPPatternOperator<>(
-                                                               
Event.createTypeSerializer(),
-                                                               false,
-                                                               
IntSerializer.INSTANCE,
-                                                               new 
NFAFactory(),
-                                                               true,
-                                                               null),
+                                               getKeyedCepOpearator(false, new 
NFAFactory()),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
 
@@ -221,13 +209,7 @@ public class CEPMigrationTest {
                        harness.close();
 
                        harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                       Event.createTypeSerializer(),
-                                       false,
-                                       IntSerializer.INSTANCE,
-                                       new NFAFactory(),
-                                       true,
-                                       null),
+                               getKeyedCepOpearator(false, new NFAFactory()),
                                keySelector,
                                BasicTypeInfo.INT_TYPE_INFO);
 
@@ -282,13 +264,7 @@ public class CEPMigrationTest {
 
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
-                                               new KeyedCEPPatternOperator<>(
-                                                               
Event.createTypeSerializer(),
-                                                               false,
-                                                               
IntSerializer.INSTANCE,
-                                                               new 
NFAFactory(),
-                                                               true,
-                                                               null),
+                                       getKeyedCepOpearator(false, new 
NFAFactory()),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
 
@@ -331,13 +307,7 @@ public class CEPMigrationTest {
 
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
-                                               new KeyedCEPPatternOperator<>(
-                                                               
Event.createTypeSerializer(),
-                                                               false,
-                                                               
IntSerializer.INSTANCE,
-                                                               new 
NFAFactory(),
-                                                               true,
-                                                               null),
+                                       getKeyedCepOpearator(false, new 
NFAFactory()),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
 
@@ -415,13 +385,7 @@ public class CEPMigrationTest {
                        harness.close();
 
                        harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                       Event.createTypeSerializer(),
-                                       false,
-                                       IntSerializer.INSTANCE,
-                                       new NFAFactory(),
-                                       true,
-                                       null),
+                               getKeyedCepOpearator(false, new NFAFactory()),
                                keySelector,
                                BasicTypeInfo.INT_TYPE_INFO);
 
@@ -475,13 +439,7 @@ public class CEPMigrationTest {
 
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
-                                               new KeyedCEPPatternOperator<>(
-                                                               
Event.createTypeSerializer(),
-                                                               false,
-                                                               
IntSerializer.INSTANCE,
-                                                               new 
SinglePatternNFAFactory(),
-                                                               true,
-                                                               null),
+                                               getKeyedCepOpearator(false, new 
SinglePatternNFAFactory()),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
 
@@ -515,13 +473,7 @@ public class CEPMigrationTest {
 
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
-                                               new KeyedCEPPatternOperator<>(
-                                                               
Event.createTypeSerializer(),
-                                                               false,
-                                                               
IntSerializer.INSTANCE,
-                                                               new 
SinglePatternNFAFactory(),
-                                                               true,
-                                                               null),
+                                               getKeyedCepOpearator(false, new 
SinglePatternNFAFactory()),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 8cf67ad..14cdd53 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.PatternTimeoutFunction;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -40,7 +42,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -66,7 +68,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.validateMockitoUsage;
 
 /**
- * Tests for {@link KeyedCEPPatternOperator} and {@link 
TimeoutKeyedCEPPatternOperator}.
+ * Tests for {@link AbstractKeyedCEPPatternOperator}.
  */
 public class CEPOperatorTest extends TestLogger {
 
@@ -240,8 +242,6 @@ public class CEPOperatorTest extends TestLogger {
         */
        @Test
        public void testKeyedAdvancingTimeWithoutElements() throws Exception {
-               final KeySelector<Event, Integer> keySelector = new 
TestKeySelector();
-
                final Event startEvent = new Event(42, "start", 1.0);
                final long watermarkTimestamp1 = 5L;
                final long watermarkTimestamp2 = 13L;
@@ -249,21 +249,43 @@ public class CEPOperatorTest extends TestLogger {
                final Map<String, List<Event>> expectedSequence = new 
HashMap<>(2);
                expectedSequence.put("start", 
Collections.<Event>singletonList(startEvent));
 
-               OneInputStreamOperatorTestHarness<Event, 
Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> 
harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                       new TimeoutKeyedCEPPatternOperator<>(
-                               Event.createTypeSerializer(),
-                               false,
-                               IntSerializer.INSTANCE,
-                               new NFAFactory(true),
-                               true,
-                               null),
-                       keySelector,
-                       BasicTypeInfo.INT_TYPE_INFO);
+               final OutputTag<Tuple2<Map<String, List<Event>>, Long>> 
timeouted =
+                       new OutputTag<Tuple2<Map<String, List<Event>>, 
Long>>("timeouted") {};
+               final KeyedOneInputStreamOperatorTestHarness<Integer, Event, 
Map<String, List<Event>>> harness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               new SelectTimeoutCepOperator<>(
+                                       Event.createTypeSerializer(),
+                                       false,
+                                       IntSerializer.INSTANCE,
+                                       new NFAFactory(true),
+                                       true,
+                                       null,
+                                       new PatternSelectFunction<Event, 
Map<String, List<Event>>>() {
+                                               @Override
+                                               public Map<String, List<Event>> 
select(Map<String, List<Event>> pattern) throws Exception {
+                                                       return pattern;
+                                               }
+                                       },
+                                       new PatternTimeoutFunction<Event, 
Tuple2<Map<String, List<Event>>, Long>>() {
+                                               @Override
+                                               public Tuple2<Map<String, 
List<Event>>, Long> timeout(
+                                                       Map<String, 
List<Event>> pattern,
+                                                       long timeoutTimestamp) 
throws Exception {
+                                                       return 
Tuple2.of(pattern, timeoutTimestamp);
+                                               }
+                                       },
+                                       timeouted
+                               ), new KeySelector<Event, Integer>() {
+                               @Override
+                               public Integer getKey(Event value) throws 
Exception {
+                                       return value.getId();
+                               }
+                       }, BasicTypeInfo.INT_TYPE_INFO);
 
                try {
                        harness.setup(
                                new KryoSerializer<>(
-                                       (Class<Either<Tuple2<Map<String, 
List<Event>>, Long>, Map<String, List<Event>>>>) (Object) Either.class,
+                                       (Class<Map<String, List<Event>>>) 
(Object) Map.class,
                                        new ExecutionConfig()));
                        harness.open();
 
@@ -272,8 +294,10 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processWatermark(new 
Watermark(watermarkTimestamp2));
 
                        Queue<Object> result = harness.getOutput();
+                       Queue<StreamRecord<Tuple2<Map<String, List<Event>>, 
Long>>> sideOutput = harness.getSideOutput(timeouted);
 
-                       assertEquals(3L, result.size());
+                       assertEquals(2L, result.size());
+                       assertEquals(1L, sideOutput.size());
 
                        Object watermark1 = result.poll();
 
@@ -281,19 +305,7 @@ public class CEPOperatorTest extends TestLogger {
 
                        assertEquals(watermarkTimestamp1, ((Watermark) 
watermark1).getTimestamp());
 
-                       Object resultObject = result.poll();
-
-                       assertTrue(resultObject instanceof StreamRecord);
-
-                       StreamRecord<Either<Tuple2<Map<String, List<Event>>, 
Long>, Map<String, List<Event>>>> streamRecord =
-                                       (StreamRecord<Either<Tuple2<Map<String, 
List<Event>>, Long>, Map<String, List<Event>>>>) resultObject;
-
-                       assertTrue(streamRecord.getValue() instanceof 
Either.Left);
-
-                       Either.Left<Tuple2<Map<String, List<Event>>, Long>, 
Map<String, List<Event>>> left =
-                       (Either.Left<Tuple2<Map<String, List<Event>>, Long>, 
Map<String, List<Event>>>) streamRecord.getValue();
-
-                       Tuple2<Map<String, List<Event>>, Long> leftResult = 
left.left();
+                       Tuple2<Map<String, List<Event>>, Long> leftResult = 
sideOutput.poll().getValue();
 
                        assertEquals(watermarkTimestamp2, (long) leftResult.f1);
                        assertEquals(expectedSequence, leftResult.f0);
@@ -310,14 +322,12 @@ public class CEPOperatorTest extends TestLogger {
 
        @Test
        public void testKeyedCEPOperatorNFAUpdate() throws Exception {
-               KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
-                       Event.createTypeSerializer(),
-                       true,
-                       IntSerializer.INSTANCE,
-                       new SimpleNFAFactory(),
+
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = CepOperatorTestUtilities.getKeyedCepOpearator(
                        true,
-                       null);
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+                       new SimpleNFAFactory());
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
+                       operator);
 
                try {
                        harness.open();
@@ -332,14 +342,8 @@ public class CEPOperatorTest extends TestLogger {
                        OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
-                       operator = new KeyedCEPPatternOperator<>(
-                               Event.createTypeSerializer(),
-                               true,
-                               IntSerializer.INSTANCE,
-                               new SimpleNFAFactory(),
-                               true,
-                               null);
-                       harness = getCepTestHarness(operator);
+                       operator = 
CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
+                       harness = 
CepOperatorTestUtilities.getCepTestHarness(operator);
 
                        harness.setup();
                        harness.initializeState(snapshot);
@@ -349,14 +353,8 @@ public class CEPOperatorTest extends TestLogger {
                        OperatorStateHandles snapshot2 = harness.snapshot(0L, 
0L);
                        harness.close();
 
-                       operator = new KeyedCEPPatternOperator<>(
-                               Event.createTypeSerializer(),
-                               true,
-                               IntSerializer.INSTANCE,
-                               new SimpleNFAFactory(),
-                               true,
-                               null);
-                       harness = getCepTestHarness(operator);
+                       operator = 
CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
+                       harness = 
CepOperatorTestUtilities.getCepTestHarness(operator);
 
                        harness.setup();
                        harness.initializeState(snapshot2);
@@ -384,14 +382,11 @@ public class CEPOperatorTest extends TestLogger {
                RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(new MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
-               KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
-                       Event.createTypeSerializer(),
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = CepOperatorTestUtilities.getKeyedCepOpearator(
                        true,
-                       IntSerializer.INSTANCE,
-                       new SimpleNFAFactory(),
-                       true,
-                       null);
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+                       new SimpleNFAFactory());
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
+                       operator);
 
                try {
                        harness.setStateBackend(rocksDBStateBackend);
@@ -408,14 +403,8 @@ public class CEPOperatorTest extends TestLogger {
                        OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
-                       operator = new KeyedCEPPatternOperator<>(
-                               Event.createTypeSerializer(),
-                               true,
-                               IntSerializer.INSTANCE,
-                               new SimpleNFAFactory(),
-                               true,
-                               null);
-                       harness = getCepTestHarness(operator);
+                       operator = 
CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
+                       harness = 
CepOperatorTestUtilities.getCepTestHarness(operator);
 
                        rocksDBStateBackend = new RocksDBStateBackend(new 
MemoryStateBackend());
                        rocksDBStateBackend.setDbStoragePath(rocksDbPath);
@@ -428,14 +417,8 @@ public class CEPOperatorTest extends TestLogger {
                        OperatorStateHandles snapshot2 = harness.snapshot(0L, 
0L);
                        harness.close();
 
-                       operator = new KeyedCEPPatternOperator<>(
-                               Event.createTypeSerializer(),
-                               true,
-                               IntSerializer.INSTANCE,
-                               new SimpleNFAFactory(),
-                               true,
-                               null);
-                       harness = getCepTestHarness(operator);
+                       operator = 
CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
+                       harness = 
CepOperatorTestUtilities.getCepTestHarness(operator);
 
                        rocksDBStateBackend = new RocksDBStateBackend(new 
MemoryStateBackend());
                        rocksDBStateBackend.setDbStoragePath(rocksDbPath);
@@ -461,14 +444,10 @@ public class CEPOperatorTest extends TestLogger {
 
        @Test
        public void testKeyedCEPOperatorNFAUpdateTimes() throws Exception {
-               KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
-                       Event.createTypeSerializer(),
-                       true,
-                       IntSerializer.INSTANCE,
-                       new SimpleNFAFactory(),
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = CepOperatorTestUtilities.getKeyedCepOpearator(
                        true,
-                       null);
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+                       new SimpleNFAFactory());
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
                try {
                        harness.open();
@@ -507,14 +486,11 @@ public class CEPOperatorTest extends TestLogger {
                RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(new MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
-               KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
-                       Event.createTypeSerializer(),
-                       true,
-                       IntSerializer.INSTANCE,
-                       new SimpleNFAFactory(),
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = CepOperatorTestUtilities.getKeyedCepOpearator(
                        true,
-                       null);
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+                       new SimpleNFAFactory());
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
+                       operator);
 
                try {
                        harness.setStateBackend(rocksDBStateBackend);
@@ -561,8 +537,8 @@ public class CEPOperatorTest extends TestLogger {
 
                Event startEventK2 = new Event(43, "start", 1.0);
 
-               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOperator(false);
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = getKeyedCepOperator(false);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
                try {
                        harness.open();
@@ -606,8 +582,8 @@ public class CEPOperatorTest extends TestLogger {
                        OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
-                       KeyedCEPPatternOperator<Event, Integer> operator2 = 
getKeyedCepOperator(false);
-                       harness = getCepTestHarness(operator2);
+                       SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> operator2 = getKeyedCepOperator(false);
+                       harness = 
CepOperatorTestUtilities.getCepTestHarness(operator2);
                        harness.setup();
                        harness.initializeState(snapshot);
                        harness.open();
@@ -657,14 +633,10 @@ public class CEPOperatorTest extends TestLogger {
                Event middle1Event3 = new Event(41, "a", 4.0);
                Event middle2Event1 = new Event(41, "b", 5.0);
 
-               KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
-                               Event.createTypeSerializer(),
-                               false,
-                               IntSerializer.INSTANCE,
-                               new ComplexNFAFactory(),
-                               true,
-                               null);
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+                       false,
+                       new ComplexNFAFactory());
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
                try {
                        harness.open();
@@ -756,8 +728,8 @@ public class CEPOperatorTest extends TestLogger {
 
                Event startEventK2 = new Event(43, "start", 1.0);
 
-               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOperator(true);
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = getKeyedCepOperator(true);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
                try {
                        harness.open();
@@ -784,8 +756,8 @@ public class CEPOperatorTest extends TestLogger {
                        OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
-                       KeyedCEPPatternOperator<Event, Integer> operator2 = 
getKeyedCepOperator(true);
-                       harness = getCepTestHarness(operator2);
+                       SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> operator2 = getKeyedCepOperator(true);
+                       harness = 
CepOperatorTestUtilities.getCepTestHarness(operator2);
                        harness.setup();
                        harness.initializeState(snapshot);
                        harness.open();
@@ -876,23 +848,18 @@ public class CEPOperatorTest extends TestLogger {
                        }
                });
 
-               KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
-                               Event.createTypeSerializer(),
-                               false,
-                               IntSerializer.INSTANCE,
-                               new NFACompiler.NFAFactory<Event>() {
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+                       false,
+                       new NFACompiler.NFAFactory<Event>() {
+                               private static final long serialVersionUID = 
477082663248051994L;
 
-                                       private static final long 
serialVersionUID = 477082663248051994L;
-
-                                       @Override
-                                       public NFA<Event> createNFA() {
-                                               return 
NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-                                       }
-                               },
-                               true,
-                               null);
+                               @Override
+                               public NFA<Event> createNFA() {
+                                       return NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+                               }
+                       });
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
                try {
                        harness.setStateBackend(rocksDBStateBackend);
@@ -952,8 +919,8 @@ public class CEPOperatorTest extends TestLogger {
 
                Event startEventK2 = new Event(43, "start", 1.0);
 
-               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOperatorWithComparator(true);
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = getKeyedCepOperatorWithComparator(true);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
                try {
                        harness.open();
@@ -980,8 +947,8 @@ public class CEPOperatorTest extends TestLogger {
                        OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
-                       KeyedCEPPatternOperator<Event, Integer> operator2 = 
getKeyedCepOperatorWithComparator(true);
-                       harness = getCepTestHarness(operator2);
+                       SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> operator2 = getKeyedCepOperatorWithComparator(true);
+                       harness = 
CepOperatorTestUtilities.getCepTestHarness(operator2);
                        harness.setup();
                        harness.initializeState(snapshot);
                        harness.open();
@@ -1009,8 +976,8 @@ public class CEPOperatorTest extends TestLogger {
 
                Event startEventK2 = new Event(43, "start", 1.0);
 
-               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOperatorWithComparator(false);
-               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
operator = getKeyedCepOperatorWithComparator(false);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
                try {
                        harness.open();
@@ -1041,8 +1008,8 @@ public class CEPOperatorTest extends TestLogger {
                        OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
-                       KeyedCEPPatternOperator<Event, Integer> operator2 = 
getKeyedCepOperatorWithComparator(false);
-                       harness = getCepTestHarness(operator2);
+                       SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> operator2 = getKeyedCepOperatorWithComparator(false);
+                       harness = 
CepOperatorTestUtilities.getCepTestHarness(operator2);
                        harness.setup();
                        harness.initializeState(snapshot);
                        harness.open();
@@ -1078,52 +1045,20 @@ public class CEPOperatorTest extends TestLogger {
                assertEquals(end, patternMap.get("end").get(0));
        }
 
-       private OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> getCepTestHarness(boolean isProcessingTime) throws Exception {
-               KeySelector<Event, Integer> keySelector = new TestKeySelector();
-
-               return new KeyedOneInputStreamOperatorTestHarness<>(
-                       getKeyedCepOperator(isProcessingTime),
-                       keySelector,
-                       BasicTypeInfo.INT_TYPE_INFO);
-       }
-
-       private OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> getCepTestHarness(
-                       KeyedCEPPatternOperator<Event, Integer> cepOperator) 
throws Exception {
-               KeySelector<Event, Integer> keySelector = new TestKeySelector();
-
-               return new KeyedOneInputStreamOperatorTestHarness<>(
-                       cepOperator,
-                       keySelector,
-                       BasicTypeInfo.INT_TYPE_INFO);
-       }
-
-       private KeyedCEPPatternOperator<Event, Integer> getKeyedCepOperator(
+       private SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
getKeyedCepOperator(
                boolean isProcessingTime) {
-
-               return new KeyedCEPPatternOperator<>(
-                       Event.createTypeSerializer(),
-                       isProcessingTime,
-                       IntSerializer.INSTANCE,
-                       new NFAFactory(),
-                       true,
-                       null);
+               return 
CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new 
NFAFactory());
        }
 
-       private KeyedCEPPatternOperator<Event, Integer> 
getKeyedCepOperatorWithComparator(
+       private SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
getKeyedCepOperatorWithComparator(
                boolean isProcessingTime) {
 
-               return new KeyedCEPPatternOperator<>(
-                       Event.createTypeSerializer(),
-                       isProcessingTime,
-                       IntSerializer.INSTANCE,
-                       new NFAFactory(),
-                       true,
-                       new org.apache.flink.cep.EventComparator<Event>() {
-                               @Override
-                               public int compare(Event o1, Event o2) {
-                                       return Double.compare(o1.getPrice(), 
o2.getPrice());
-                               }
-                       });
+               return 
CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new 
NFAFactory(), new org.apache.flink.cep.EventComparator<Event>() {
+                       @Override
+                       public int compare(Event o1, Event o2) {
+                               return Double.compare(o1.getPrice(), 
o2.getPrice());
+                       }
+               });
        }
 
        private void compareMaps(List<List<Event>> actual, List<List<Event>> 
expected) {
@@ -1181,14 +1116,12 @@ public class CEPOperatorTest extends TestLogger {
                }
        }
 
-       private static class TestKeySelector implements KeySelector<Event, 
Integer> {
-
-               private static final long serialVersionUID = 
-4873366487571254798L;
+       private OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> getCepTestHarness(boolean isProcessingTime) throws Exception {
+               return 
CepOperatorTestUtilities.getCepTestHarness(getKeyedCepOpearator(isProcessingTime));
+       }
 
-               @Override
-               public Integer getKey(Event value) throws Exception {
-                       return value.getId();
-               }
+       private SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
getKeyedCepOpearator(boolean isProcessingTime) {
+               return 
CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new 
CEPOperatorTest.NFAFactory());
        }
 
        private static class NFAFactory implements 
NFACompiler.NFAFactory<Event> {

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 9fc4337..1690284 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
+import static 
org.apache.flink.cep.operator.CepOperatorTestUtilities.getKeyedCepOpearator;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -372,13 +373,10 @@ public class CEPRescalingTest {
 
                KeySelector<Event, Integer> keySelector = new TestKeySelector();
                return new KeyedOneInputStreamOperatorTestHarness<>(
-                       new KeyedCEPPatternOperator<>(
-                               Event.createTypeSerializer(),
+                       getKeyedCepOpearator(
                                false,
-                               
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                new NFAFactory(),
-                               true,
-                               null),
+                               
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())),
                        keySelector,
                        BasicTypeInfo.INT_TYPE_INFO,
                        maxParallelism,

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
new file mode 100644
index 0000000..17d6656
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility methods for creating test {@link AbstractKeyedCEPPatternOperator}.
+ */
+public class CepOperatorTestUtilities {
+
+       private static class TestKeySelector implements KeySelector<Event, 
Integer> {
+
+               private static final long serialVersionUID = 
-4873366487571254798L;
+
+               @Override
+               public Integer getKey(Event value) throws Exception {
+                       return value.getId();
+               }
+       }
+
+       public static OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> getCepTestHarness(
+               SelectCepOperator<Event, Integer, Map<String, List<Event>>> 
cepOperator) throws Exception {
+               KeySelector<Event, Integer> keySelector = new TestKeySelector();
+
+               return new KeyedOneInputStreamOperatorTestHarness<>(
+                       cepOperator,
+                       keySelector,
+                       BasicTypeInfo.INT_TYPE_INFO);
+       }
+
+       public static SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> getKeyedCepOpearator(
+               boolean isProcessingTime,
+               NFACompiler.NFAFactory<Event> nfaFactory) {
+               return getKeyedCepOpearator(isProcessingTime, nfaFactory, 
IntSerializer.INSTANCE, null);
+       }
+
+       public static SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> getKeyedCepOpearator(
+               boolean isProcessingTime,
+               NFACompiler.NFAFactory<Event> nfaFactory,
+               EventComparator<Event> comparator) {
+               return getKeyedCepOpearator(isProcessingTime, nfaFactory, 
IntSerializer.INSTANCE, comparator);
+       }
+
+       public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> 
getKeyedCepOpearator(
+               boolean isProcessingTime,
+               NFACompiler.NFAFactory<Event> nfaFactory,
+               TypeSerializer<K> keySerializer,
+               EventComparator<Event> comparator) {
+
+               return getKeyedCepOpearator(isProcessingTime, nfaFactory, 
keySerializer, true, comparator);
+       }
+
+       public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> 
getKeyedCepOpearator(
+               boolean isProcessingTime,
+               NFACompiler.NFAFactory<Event> nfaFactory,
+               TypeSerializer<K> keySerializer) {
+
+               return getKeyedCepOpearator(isProcessingTime, nfaFactory, 
keySerializer, true, null);
+       }
+
+       public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> 
getKeyedCepOpearator(
+               boolean isProcessingTime,
+               NFACompiler.NFAFactory<Event> nfaFactory,
+               TypeSerializer<K> keySerializer,
+               boolean migratingFromOldKeyedOperator,
+               EventComparator<Event> comparator) {
+               return new SelectCepOperator<>(
+                       Event.createTypeSerializer(),
+                       isProcessingTime,
+                       keySerializer,
+                       nfaFactory,
+                       migratingFromOldKeyedOperator,
+                       comparator,
+                       new PatternSelectFunction<Event, Map<String, 
List<Event>>>() {
+                               @Override
+                               public Map<String, List<Event>> 
select(Map<String, List<Event>> pattern) throws Exception {
+                                       return pattern;
+                               }
+                       });
+       }
+
+       private CepOperatorTestUtilities() {
+       }
+}

Reply via email to