[FLINK-2105] Implement Sort-Merge Outer Join algorithm

This closes #907


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/941ac6df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/941ac6df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/941ac6df

Branch: refs/heads/master
Commit: 941ac6dfd446d8e97e2fe2f589164978602adf94
Parents: df9f481
Author: r-pogalz <r.pog...@campus.tu-berlin.de>
Authored: Mon Aug 3 12:59:48 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Aug 4 23:51:27 2015 +0200

----------------------------------------------------------------------
 .../operators/sort/AbstractMergeIterator.java   |  58 +--
 .../sort/AbstractMergeOuterJoinIterator.java    | 189 ++++++++
 .../sort/NonReusingMergeOuterJoinIterator.java  |  60 +++
 .../sort/ReusingMergeOuterJoinIterator.java     |  63 +++
 ...bstractSortMergeOuterJoinIteratorITCase.java | 462 +++++++++++++++++++
 ...ReusingSortMergeInnerJoinIteratorITCase.java |   4 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |  82 ++++
 ...ReusingSortMergeInnerJoinIteratorITCase.java |   4 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |  82 ++++
 .../runtime/operators/testutils/Match.java      |   2 +-
 .../testutils/MatchRemovingJoiner.java          |  58 +++
 .../testutils/MatchRemovingMatcher.java         |  58 ---
 12 files changed, 1030 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
index 9a61c14..c01afc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
@@ -115,20 +115,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
        }
 
        /**
-        * Calls the <code>JoinFunction#match()</code> method for all two 
key-value pairs that share the same key and come
-        * from different inputs. The output of the <code>match()</code> method 
is forwarded.
+        * Calls the <code>JoinFunction#join()</code> method for all two 
key-value pairs that share the same key and come
+        * from different inputs. The output of the <code>join()</code> method 
is forwarded.
         * <p>
         * This method first zig-zags between the two sorted inputs in order to 
find a common
-        * key, and then calls the match stub with the cross product of the 
values.
+        * key, and then calls the join stub with the cross product of the 
values.
         *
         * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
         * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
         */
        @Override
-       public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, 
O> matchFunction, final Collector<O> collector)
+       public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, 
O> joinFunction, final Collector<O> collector)
                        throws Exception;
 
-       protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> 
values2, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) 
throws Exception {
+       protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> 
values2, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) 
throws Exception {
                final T1 firstV1 = values1.next();
                final T2 firstV2 = values2.next();
 
@@ -143,23 +143,23 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
                        if (v2HasNext) {
                                // both sides contain more than one value
                                // TODO: Decide which side to spill and which 
to block!
-                               crossMwithNValues(firstV1, values1, firstV2, 
values2, matchFunction, collector);
+                               crossMwithNValues(firstV1, values1, firstV2, 
values2, joinFunction, collector);
                        } else {
-                               crossSecond1withNValues(firstV2, firstV1, 
values1, matchFunction, collector);
+                               crossSecond1withNValues(firstV2, firstV1, 
values1, joinFunction, collector);
                        }
                } else {
                        if (v2HasNext) {
-                               crossFirst1withNValues(firstV1, firstV2, 
values2, matchFunction, collector);
+                               crossFirst1withNValues(firstV1, firstV2, 
values2, joinFunction, collector);
                        } else {
                                // both sides contain only one value
-                               matchFunction.join(firstV1, firstV2, collector);
+                               joinFunction.join(firstV1, firstV2, collector);
                        }
                }
        }
 
        /**
         * Crosses a single value from the first input with N values, all 
sharing a common key.
-        * Effectively realizes a <i>1:N</i> match (join).
+        * Effectively realizes a <i>1:N</i> join.
         *
         * @param val1      The value form the <i>1</i> side.
         * @param firstValN The first of the values from the <i>N</i> side.
@@ -167,21 +167,21 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
         * @throws Exception Forwards all exceptions thrown by the stub.
         */
        private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-                                                                               
final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, 
final Collector<O> collector)
+                                                                               
final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> joinFunction, final 
Collector<O> collector)
                        throws Exception {
                T1 copy1 = createCopy(serializer1, val1, this.copy1);
-               matchFunction.join(copy1, firstValN, collector);
+               joinFunction.join(copy1, firstValN, collector);
 
-               // set copy and match first element
+               // set copy and join first element
                boolean more = true;
                do {
                        final T2 nRec = valsN.next();
 
                        if (valsN.hasNext()) {
                                copy1 = createCopy(serializer1, val1, 
this.copy1);
-                               matchFunction.join(copy1, nRec, collector);
+                               joinFunction.join(copy1, nRec, collector);
                        } else {
-                               matchFunction.join(val1, nRec, collector);
+                               joinFunction.join(val1, nRec, collector);
                                more = false;
                        }
                }
@@ -190,7 +190,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
 
        /**
         * Crosses a single value from the second side with N values, all 
sharing a common key.
-        * Effectively realizes a <i>N:1</i> match (join).
+        * Effectively realizes a <i>N:1</i> join.
         *
         * @param val1      The value form the <i>1</i> side.
         * @param firstValN The first of the values from the <i>N</i> side.
@@ -198,20 +198,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
         * @throws Exception Forwards all exceptions thrown by the stub.
         */
        private void crossSecond1withNValues(T2 val1, T1 firstValN,
-                                                                               
Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> 
collector) throws Exception {
+                                                                               
Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> 
collector) throws Exception {
                T2 copy2 = createCopy(serializer2, val1, this.copy2);
-               matchFunction.join(firstValN, copy2, collector);
+               joinFunction.join(firstValN, copy2, collector);
 
-               // set copy and match first element
+               // set copy and join first element
                boolean more = true;
                do {
                        final T1 nRec = valsN.next();
 
                        if (valsN.hasNext()) {
                                copy2 = createCopy(serializer2, val1, 
this.copy2);
-                               matchFunction.join(nRec, copy2, collector);
+                               joinFunction.join(nRec, copy2, collector);
                        } else {
-                               matchFunction.join(nRec, val1, collector);
+                               joinFunction.join(nRec, val1, collector);
                                more = false;
                        }
                }
@@ -220,7 +220,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
 
        private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
                                                                        final 
T2 firstV2, final Iterator<T2> blockVals,
-                                                                       final 
FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) throws 
Exception {
+                                                                       final 
FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) throws 
Exception {
                // ==================================================
                // We have one first (head) element from both inputs (firstV1 
and firstV2)
                // We have an iterator for both inputs.
@@ -237,13 +237,13 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
                // 5) cross the head of the spilling side with the next block
                // 6) cross the spilling iterator with the next block.
 
-               // match the first values first
+               // join the first values first
                T1 copy1 = this.createCopy(serializer1, firstV1, this.copy1);
                T2 blockHeadCopy = this.createCopy(serializer2, firstV2, 
this.blockHeadCopy);
                T1 spillHeadCopy = null;
 
                // --------------- 1) Cross the heads -------------------
-               matchFunction.join(copy1, firstV2, collector);
+               joinFunction.join(copy1, firstV2, collector);
 
                // for the remaining values, we do a block-nested-loops join
                SpillingResettableIterator<T1> spillIt = null;
@@ -256,7 +256,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
                        while (this.blockIt.hasNext()) {
                                final T2 nextBlockRec = this.blockIt.next();
                                copy1 = this.createCopy(serializer1, firstV1, 
this.copy1);
-                               matchFunction.join(copy1, nextBlockRec, 
collector);
+                               joinFunction.join(copy1, nextBlockRec, 
collector);
                        }
                        this.blockIt.reset();
 
@@ -286,7 +286,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
 
                                // -------- 3) cross the iterator of the 
spilling side with the head of the block side --------
                                T2 copy2 = this.createCopy(serializer2, 
blockHeadCopy, this.copy2);
-                               matchFunction.join(copy1, copy2, collector);
+                               joinFunction.join(copy1, copy2, collector);
 
                                // -------- 4) cross the iterator of the 
spilling side with the first block --------
                                while (this.blockIt.hasNext()) {
@@ -294,7 +294,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
 
                                        // get instances of key and block value
                                        copy1 = this.createCopy(serializer1, 
nextSpillVal, this.copy1);
-                                       matchFunction.join(copy1, nextBlockRec, 
collector);
+                                       joinFunction.join(copy1, nextBlockRec, 
collector);
                                }
                                // reset block iterator
                                this.blockIt.reset();
@@ -316,7 +316,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
                                while (this.blockIt.hasNext()) {
                                        copy1 = this.createCopy(serializer1, 
spillHeadCopy, this.copy1);
                                        final T2 nextBlockVal = blockIt.next();
-                                       matchFunction.join(copy1, nextBlockVal, 
collector);
+                                       joinFunction.join(copy1, nextBlockVal, 
collector);
                                }
                                this.blockIt.reset();
 
@@ -329,7 +329,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> 
implements JoinTaskIterat
                                                // get instances of key and 
block value
                                                final T2 nextBlockVal = 
this.blockIt.next();
                                                copy1 = 
this.createCopy(serializer1, nextSpillVal, this.copy1);
-                                               matchFunction.join(copy1, 
nextBlockVal, collector);
+                                               joinFunction.join(copy1, 
nextBlockVal, collector);
                                        }
 
                                        // reset block iterator

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
new file mode 100644
index 0000000..01b371e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ */
+public abstract class AbstractMergeOuterJoinIterator<T1, T2, O> extends 
AbstractMergeIterator<T1, T2, O> {
+
+       public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+       private final OuterJoinType outerJoinType;
+
+       private boolean initialized = false;
+       private boolean it1Empty = false;
+       private boolean it2Empty = false;
+
+
+       public AbstractMergeOuterJoinIterator(
+                       OuterJoinType outerJoinType,
+                       MutableObjectIterator<T1> input1,
+                       MutableObjectIterator<T2> input2,
+                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
+                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
+                       TypePairComparator<T1, T2> pairComparator,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       int numMemoryPages,
+                       AbstractInvokable parentTask)
+                       throws MemoryAllocationException {
+               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+               this.outerJoinType = outerJoinType;
+       }
+
+       /**
+        * Calls the <code>JoinFunction#join()</code> method for all two 
key-value pairs that share the same key and come
+        * from different inputs. Furthermore, depending on the outer join type 
(LEFT, RIGHT, FULL), all key-value pairs where no
+        * matching partner from the other input exists are joined with null.
+        * The output of the <code>join()</code> method is forwarded.
+        *
+        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+        */
+       @Override
+       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
joinFunction, final Collector<O> collector) throws Exception {
+               if (!initialized) {
+                       //first run, set iterators to first elements
+                       it1Empty = !this.iterator1.nextKey();
+                       it2Empty = !this.iterator2.nextKey();
+                       initialized = true;
+               }
+
+               if (it1Empty && it2Empty) {
+                       return false;
+               } else if (it2Empty) {
+                       if (outerJoinType == OuterJoinType.LEFT || 
outerJoinType == OuterJoinType.FULL) {
+                               
joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector);
+                               it1Empty = !iterator1.nextKey();
+                               return true;
+                       } else {
+                               //consume rest of left side
+                               while (iterator1.nextKey()) ;
+                               it1Empty = true;
+                               return false;
+                       }
+               } else if (it1Empty) {
+                       if (outerJoinType == OuterJoinType.RIGHT || 
outerJoinType == OuterJoinType.FULL) {
+                               
joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector);
+                               it2Empty = !iterator2.nextKey();
+                               return true;
+                       } else {
+                               //consume rest of right side
+                               while (iterator2.nextKey()) ;
+                               it2Empty = true;
+                               return false;
+                       }
+               } else {
+                       final TypePairComparator<T1, T2> comparator = 
super.pairComparator;
+                       comparator.setReference(this.iterator1.getCurrent());
+                       T2 current2 = this.iterator2.getCurrent();
+
+                       // zig zag
+                       while (true) {
+                               // determine the relation between the (possibly 
composite) keys
+                               final int comp = 
comparator.compareToReference(current2);
+
+                               if (comp == 0) {
+                                       break;
+                               }
+
+                               if (comp < 0) {
+                                       //right key < left key
+                                       if (outerJoinType == 
OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) {
+                                               //join right key values with 
null in case of right or full outer join
+                                               
joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector);
+                                               it2Empty = !iterator2.nextKey();
+                                               return true;
+                                       } else {
+                                               //skip this right key if it is 
a left outer join
+                                               if (!this.iterator2.nextKey()) {
+                                                       //if right side is 
empty, join current left key values with null
+                                                       
joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector);
+                                                       it1Empty = 
!iterator1.nextKey();
+                                                       it2Empty = true;
+                                                       return true;
+                                               }
+                                               current2 = 
this.iterator2.getCurrent();
+                                       }
+                               } else {
+                                       //right key > left key
+                                       if (outerJoinType == OuterJoinType.LEFT 
|| outerJoinType == OuterJoinType.FULL) {
+                                               //join left key values with 
null in case of left or full outer join
+                                               
joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector);
+                                               it1Empty = !iterator1.nextKey();
+                                               return true;
+                                       } else {
+                                               //skip this left key if it is a 
right outer join
+                                               if (!this.iterator1.nextKey()) {
+                                                       //if right side is 
empty, join current right key values with null
+                                                       
joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector);
+                                                       it1Empty = true;
+                                                       it2Empty = 
!iterator2.nextKey();
+                                                       return true;
+                                               }
+                                               
comparator.setReference(this.iterator1.getCurrent());
+                                       }
+                               }
+                       }
+
+                       // here, we have a common key! call the join function 
with the cross product of the
+                       // values
+                       final Iterator<T1> values1 = this.iterator1.getValues();
+                       final Iterator<T2> values2 = this.iterator2.getValues();
+
+                       crossMatchingGroup(values1, values2, joinFunction, 
collector);
+                       it1Empty = !iterator1.nextKey();
+                       it2Empty = !iterator2.nextKey();
+                       return true;
+               }
+       }
+
+       private void joinLeftKeyValuesWithNull(Iterator<T1> values, 
FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws 
Exception {
+               while (values.hasNext()) {
+                       T1 next = values.next();
+                       this.copy1 = createCopy(serializer1, next, copy1);
+                       joinFunction.join(copy1, null, collector);
+               }
+       }
+
+       private void joinRightKeyValuesWithNull(Iterator<T2> values, 
FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws 
Exception {
+               while (values.hasNext()) {
+                       T2 next = values.next();
+                       this.copy2 = createCopy(serializer2, next, copy2);
+                       joinFunction.join(null, copy2, collector);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
new file mode 100644
index 0000000..ac49ece
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class NonReusingMergeOuterJoinIterator<T1, T2, O> extends 
AbstractMergeOuterJoinIterator<T1, T2, O> {
+
+       public NonReusingMergeOuterJoinIterator(
+                       OuterJoinType outerJoinType,
+                       MutableObjectIterator<T1> input1,
+                       MutableObjectIterator<T2> input2,
+                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
+                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
+                       TypePairComparator<T1, T2> pairComparator,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       int numMemoryPages,
+                       AbstractInvokable parentTask)
+                       throws MemoryAllocationException {
+               super(outerJoinType, input1, input2, serializer1, comparator1, 
serializer2, comparator2, pairComparator, memoryManager, ioManager, 
numMemoryPages, parentTask);
+       }
+
+       @Override
+       protected <T> KeyGroupedIterator<T> 
createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> 
serializer, TypeComparator<T> comparator) {
+               return new NonReusingKeyGroupedIterator<T>(input, comparator);
+       }
+
+       @Override
+       protected <T> T createCopy(TypeSerializer<T> serializer, T value, T 
reuse) {
+               return serializer.copy(value);
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
new file mode 100644
index 0000000..0cefbc5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class ReusingMergeOuterJoinIterator<T1, T2, O> extends 
AbstractMergeOuterJoinIterator<T1, T2, O> {
+
+       public ReusingMergeOuterJoinIterator(
+                       OuterJoinType outerJoinType,
+                       MutableObjectIterator<T1> input1,
+                       MutableObjectIterator<T2> input2,
+                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
+                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
+                       TypePairComparator<T1, T2> pairComparator,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       int numMemoryPages,
+                       AbstractInvokable parentTask)
+                       throws MemoryAllocationException {
+               super(outerJoinType, input1, input2, serializer1, comparator1, 
serializer2, comparator2, pairComparator, memoryManager, ioManager, 
numMemoryPages, parentTask);
+
+               this.copy1 = serializer1.createInstance();
+               this.spillHeadCopy = serializer1.createInstance();
+               this.copy2 = serializer2.createInstance();
+               this.blockHeadCopy = serializer2.createInstance();
+       }
+
+       @Override
+       protected <T> KeyGroupedIterator<T> 
createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> 
serializer, TypeComparator<T> comparator) {
+               return new ReusingKeyGroupedIterator<T>(input, serializer, 
comparator);
+       }
+
+       @Override
+       protected <T> T createCopy(TypeSerializer<T> serializer, T value, T 
reuse) { return serializer.copy(value, reuse); }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
new file mode 100644
index 0000000..1fbe025
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import org.apache.flink.runtime.operators.testutils.*;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleConstantValueIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGeneratorIterator;
+import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+public abstract class AbstractSortMergeOuterJoinIteratorITCase {
+
+       // total memory
+       private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+       private static final int PAGES_FOR_BNLJN = 2;
+
+       // the size of the left and right inputs
+       private static final int INPUT_1_SIZE = 20000;
+
+       private static final int INPUT_2_SIZE = 1000;
+
+       // random seeds for the left and right input data generators
+       private static final long SEED1 = 561349061987311L;
+
+       private static final long SEED2 = 231434613412342L;
+
+       // dummy abstract task
+       private final AbstractInvokable parentTask = new DummyInvokable();
+
+       private IOManager ioManager;
+       private MemoryManager memoryManager;
+
+       private TupleTypeInfo<Tuple2<String, String>> typeInfo1;
+       private TupleTypeInfo<Tuple2<String, Integer>> typeInfo2;
+       private TupleSerializer<Tuple2<String, String>> serializer1;
+       private TupleSerializer<Tuple2<String, Integer>> serializer2;
+       private TypeComparator<Tuple2<String, String>> comparator1;
+       private TypeComparator<Tuple2<String, Integer>> comparator2;
+       private TypePairComparator<Tuple2<String, String>, Tuple2<String, 
Integer>> pairComp;
+
+
+       @Before
+       public void beforeTest() {
+               ExecutionConfig config = new ExecutionConfig();
+               config.disableObjectReuse();
+
+               typeInfo1 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, 
String.class);
+               typeInfo2 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, 
Integer.class);
+               serializer1 = typeInfo1.createSerializer(config);
+               serializer2 = typeInfo2.createSerializer(config);
+               comparator1 = typeInfo1.createComparator(new int[]{0}, new 
boolean[]{true}, 0, config);
+               comparator2 = typeInfo2.createComparator(new int[]{0}, new 
boolean[]{true}, 0, config);
+               pairComp = new GenericPairComparator<Tuple2<String, String>, 
Tuple2<String, Integer>>(comparator1, comparator2);
+
+               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+               this.ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               if (this.ioManager != null) {
+                       this.ioManager.shutdown();
+                       if (!this.ioManager.isProperlyShutDown()) {
+                               Assert.fail("I/O manager failed to properly 
shut down.");
+                       }
+                       this.ioManager = null;
+               }
+
+               if (this.memoryManager != null) {
+                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
+                                       this.memoryManager.verifyEmpty());
+                       this.memoryManager.shutdown();
+                       this.memoryManager = null;
+               }
+       }
+
+       protected void testFullOuterWithSample() throws Exception {
+               CollectionIterator<Tuple2<String, String>> input1 = 
CollectionIterator.of(
+                               new Tuple2<String, String>("Jack", 
"Engineering"),
+                               new Tuple2<String, String>("Tim", "Sales"),
+                               new Tuple2<String, String>("Zed", "HR")
+               );
+               CollectionIterator<Tuple2<String, Integer>> input2 = 
CollectionIterator.of(
+                               new Tuple2<String, Integer>("Allison", 100),
+                               new Tuple2<String, Integer>("Jack", 200),
+                               new Tuple2<String, Integer>("Zed", 150),
+                               new Tuple2<String, Integer>("Zed", 250)
+               );
+
+               OuterJoinType outerJoinType = OuterJoinType.FULL;
+               List<Tuple4<String, String, String, Object>> actual = 
computeOuterJoin(input1, input2, outerJoinType);
+
+               List<Tuple4<String, String, String, Object>> expected = 
Arrays.asList(
+                               new Tuple4<String, String, String, 
Object>(null, null, "Allison", 100),
+                               new Tuple4<String, String, String, 
Object>("Jack", "Engineering", "Jack", 200),
+                               new Tuple4<String, String, String, 
Object>("Tim", "Sales", null, null),
+                               new Tuple4<String, String, String, 
Object>("Zed", "HR", "Zed", 150),
+                               new Tuple4<String, String, String, 
Object>("Zed", "HR", "Zed", 250)
+               );
+
+               Assert.assertEquals(expected, actual);
+       }
+
+       protected void testLeftOuterWithSample() throws Exception {
+               CollectionIterator<Tuple2<String, String>> input1 = 
CollectionIterator.of(
+                               new Tuple2<String, String>("Jack", 
"Engineering"),
+                               new Tuple2<String, String>("Tim", "Sales"),
+                               new Tuple2<String, String>("Zed", "HR")
+               );
+               CollectionIterator<Tuple2<String, Integer>> input2 = 
CollectionIterator.of(
+                               new Tuple2<String, Integer>("Allison", 100),
+                               new Tuple2<String, Integer>("Jack", 200),
+                               new Tuple2<String, Integer>("Zed", 150),
+                               new Tuple2<String, Integer>("Zed", 250)
+               );
+
+               List<Tuple4<String, String, String, Object>> actual = 
computeOuterJoin(input1, input2, OuterJoinType.LEFT);
+
+               List<Tuple4<String, String, String, Object>> expected = 
Arrays.asList(
+                               new Tuple4<String, String, String, 
Object>("Jack", "Engineering", "Jack", 200),
+                               new Tuple4<String, String, String, 
Object>("Tim", "Sales", null, null),
+                               new Tuple4<String, String, String, 
Object>("Zed", "HR", "Zed", 150),
+                               new Tuple4<String, String, String, 
Object>("Zed", "HR", "Zed", 250)
+               );
+
+               Assert.assertEquals(expected, actual);
+       }
+
+       protected void testRightOuterWithSample() throws Exception {
+               CollectionIterator<Tuple2<String, String>> input1 = 
CollectionIterator.of(
+                               new Tuple2<String, String>("Jack", 
"Engineering"),
+                               new Tuple2<String, String>("Tim", "Sales"),
+                               new Tuple2<String, String>("Zed", "HR")
+               );
+               CollectionIterator<Tuple2<String, Integer>> input2 = 
CollectionIterator.of(
+                               new Tuple2<String, Integer>("Allison", 100),
+                               new Tuple2<String, Integer>("Jack", 200),
+                               new Tuple2<String, Integer>("Zed", 150),
+                               new Tuple2<String, Integer>("Zed", 250)
+               );
+
+               List<Tuple4<String, String, String, Object>> actual = 
computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
+
+               List<Tuple4<String, String, String, Object>> expected = 
Arrays.asList(
+                               new Tuple4<String, String, String, 
Object>(null, null, "Allison", 100),
+                               new Tuple4<String, String, String, 
Object>("Jack", "Engineering", "Jack", 200),
+                               new Tuple4<String, String, String, 
Object>("Zed", "HR", "Zed", 150),
+                               new Tuple4<String, String, String, 
Object>("Zed", "HR", "Zed", 250)
+               );
+
+               Assert.assertEquals(expected, actual);
+       }
+
+       protected void testRightSideEmpty() throws Exception {
+               CollectionIterator<Tuple2<String, String>> input1 = 
CollectionIterator.of(
+                               new Tuple2<String, String>("Jack", 
"Engineering"),
+                               new Tuple2<String, String>("Tim", "Sales"),
+                               new Tuple2<String, String>("Zed", "HR")
+               );
+               CollectionIterator<Tuple2<String, Integer>> input2 = 
CollectionIterator.of();
+
+               List<Tuple4<String, String, String, Object>> actualLeft = 
computeOuterJoin(input1, input2, OuterJoinType.LEFT);
+               List<Tuple4<String, String, String, Object>> actualRight = 
computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
+               List<Tuple4<String, String, String, Object>> actualFull = 
computeOuterJoin(input1, input2, OuterJoinType.FULL);
+
+               List<Tuple4<String, String, String, Object>> expected = 
Arrays.asList(
+                               new Tuple4<String, String, String, 
Object>("Jack", "Engineering", null, null),
+                               new Tuple4<String, String, String, 
Object>("Tim", "Sales", null, null),
+                               new Tuple4<String, String, String, 
Object>("Zed", "HR", null, null)
+               );
+
+               Assert.assertEquals(expected, actualLeft);
+               Assert.assertEquals(expected, actualFull);
+               
Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(),
 actualRight);
+       }
+
+       protected void testLeftSideEmpty() throws Exception {
+               CollectionIterator<Tuple2<String, String>> input1 = 
CollectionIterator.of();
+               CollectionIterator<Tuple2<String, Integer>> input2 = 
CollectionIterator.of(
+                               new Tuple2<String, Integer>("Allison", 100),
+                               new Tuple2<String, Integer>("Jack", 200),
+                               new Tuple2<String, Integer>("Zed", 150),
+                               new Tuple2<String, Integer>("Zed", 250)
+               );
+
+               List<Tuple4<String, String, String, Object>> actualLeft = 
computeOuterJoin(input1, input2, OuterJoinType.LEFT);
+               List<Tuple4<String, String, String, Object>> actualRight = 
computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
+               List<Tuple4<String, String, String, Object>> actualFull = 
computeOuterJoin(input1, input2, OuterJoinType.FULL);
+
+               List<Tuple4<String, String, String, Object>> expected = 
Arrays.asList(
+                               new Tuple4<String, String, String, 
Object>(null, null, "Allison", 100),
+                               new Tuple4<String, String, String, 
Object>(null, null, "Jack", 200),
+                               new Tuple4<String, String, String, 
Object>(null, null, "Zed", 150),
+                               new Tuple4<String, String, String, 
Object>(null, null, "Zed", 250)
+               );
+
+               
Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(),
 actualLeft);
+               Assert.assertEquals(expected, actualRight);
+               Assert.assertEquals(expected, actualFull);
+       }
+
+       private List<Tuple4<String, String, String, Object>> 
computeOuterJoin(ResettableMutableObjectIterator<Tuple2<String, String>> input1,
+                                                                               
                                                                  
ResettableMutableObjectIterator<Tuple2<String, Integer>> input2,
+                                                                               
                                                                  OuterJoinType 
outerJoinType) throws Exception {
+               input1.reset();
+               input2.reset();
+               AbstractMergeOuterJoinIterator<Tuple2<String, String>, 
Tuple2<String, Integer>, Tuple4<String, String, String, Object>> iterator =
+                               createOuterJoinIterator(outerJoinType, input1, 
input2, serializer1, comparator1, serializer2, comparator2,
+                                               pairComp, this.memoryManager, 
this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+
+               List<Tuple4<String, String, String, Object>> actual = new 
ArrayList<Tuple4<String, String, String, Object>>();
+               ListCollector<Tuple4<String, String, String, Object>> collector 
= new ListCollector<Tuple4<String, String, String, Object>>(actual);
+               while (iterator.callWithNextKey(new SimpleTupleJoinFunction(), 
collector)) ;
+               iterator.close();
+
+               return actual;
+       }
+
+       protected void testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType 
outerJoinType, int input1Size, int input1Duplicates, int input1ValueLength,
+                                                                               
                                float input1KeyDensity, int input2Size, int 
input2Duplicates, int input2ValueLength, float input2KeyDensity) {
+               TypeSerializer<Tuple2<Integer, String>> serializer1 = new 
TupleSerializer<Tuple2<Integer, String>>(
+                               (Class<Tuple2<Integer, String>>) (Class<?>) 
Tuple2.class,
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+               TypeSerializer<Tuple2<Integer, String>> serializer2 = new 
TupleSerializer<Tuple2<Integer, String>>(
+                               (Class<Tuple2<Integer, String>>) (Class<?>) 
Tuple2.class,
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+               TypeComparator<Tuple2<Integer, String>> comparator1 =  new 
TupleComparator<Tuple2<Integer, String>>(
+                               new int[]{0},
+                               new TypeComparator<?>[] { new 
IntComparator(true) },
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE });
+               TypeComparator<Tuple2<Integer, String>> comparator2 =  new 
TupleComparator<Tuple2<Integer, String>>(
+                               new int[]{0},
+                               new TypeComparator<?>[] { new 
IntComparator(true) },
+                               new TypeSerializer<?>[] { 
IntSerializer.INSTANCE });
+
+               TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> pairComparator =
+                               new GenericPairComparator<Tuple2<Integer, 
String>, Tuple2<Integer, String>>(comparator1, comparator2);
+
+               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+               this.ioManager = new IOManagerAsync();
+
+               final int DUPLICATE_KEY = 13;
+
+               try {
+                       final TupleGenerator generator1 = new 
TupleGenerator(SEED1, 500, input1KeyDensity, input1ValueLength, 
KeyMode.SORTED_SPARSE, ValueMode.RANDOM_LENGTH, null);
+                       final TupleGenerator generator2 = new 
TupleGenerator(SEED2, 500, input2KeyDensity, input2ValueLength, 
KeyMode.SORTED_SPARSE, ValueMode.RANDOM_LENGTH, null);
+
+                       final TupleGeneratorIterator gen1Iter = new 
TupleGeneratorIterator(generator1, input1Size);
+                       final TupleGeneratorIterator gen2Iter = new 
TupleGeneratorIterator(generator2, input2Size);
+
+                       final TupleConstantValueIterator const1Iter = new 
TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", 
input1Duplicates);
+                       final TupleConstantValueIterator const2Iter = new 
TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", 
input2Duplicates);
+
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, 
String>>>();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+
+                       final List<MutableObjectIterator<Tuple2<Integer, 
String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, 
String>>>();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+
+                       MutableObjectIterator<Tuple2<Integer, String>> input1 = 
new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+                       MutableObjectIterator<Tuple2<Integer, String>> input2 = 
new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+
+                       // collect expected data
+                       final Map<Integer, Collection<Match>> 
expectedMatchesMap = joinValues(
+                                       collectData(input1),
+                                       collectData(input2),
+                                       outerJoinType);
+
+                       // re-create the whole thing for actual processing
+
+                       // reset the generators and iterators
+                       generator1.reset();
+                       generator2.reset();
+                       const1Iter.reset();
+                       const2Iter.reset();
+                       gen1Iter.reset();
+                       gen2Iter.reset();
+
+                       inList1.clear();
+                       inList1.add(gen1Iter);
+                       inList1.add(const1Iter);
+
+                       inList2.clear();
+                       inList2.add(gen2Iter);
+                       inList2.add(const2Iter);
+
+                       input1 = new MergeIterator<Tuple2<Integer, 
String>>(inList1, comparator1.duplicate());
+                       input2 = new MergeIterator<Tuple2<Integer, 
String>>(inList2, comparator2.duplicate());
+
+                       final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
+                                       new 
MatchRemovingJoiner(expectedMatchesMap);
+
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
+
+
+                       // we create this sort-merge iterator with little 
memory for the block-nested-loops fall-back to make sure it
+                       // needs to spill for the duplicate keys
+                       AbstractMergeOuterJoinIterator<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                                       createOuterJoinIterator(
+                                                       outerJoinType, input1, 
input2, serializer1, comparator1, serializer2, comparator2,
+                                                       pairComparator, 
this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+
+                       iterator.open();
+
+                       while (iterator.callWithNextKey(joinFunction, 
collector)) ;
+
+                       iterator.close();
+
+                       // assert that each expected match was seen
+                       for (Entry<Integer, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
+                               if (!entry.getValue().isEmpty()) {
+                                       Assert.fail("Collection for key " + 
entry.getKey() + " is not empty");
+                               }
+                       }
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       protected abstract <T1, T2> AbstractMergeOuterJoinIterator 
createOuterJoinIterator(OuterJoinType outerJoinType,
+                                                                               
                                                                          
MutableObjectIterator<T1> input1,
+                                                                               
                                                                          
MutableObjectIterator<T2> input2,
+                                                                               
                                                                          
TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+                                                                               
                                                                          
TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+                                                                               
                                                                          
TypePairComparator<T1, T2> pairComparator,
+                                                                               
                                                                          
MemoryManager memoryManager,
+                                                                               
                                                                          
IOManager ioManager,
+                                                                               
                                                                          int 
numMemoryPages,
+                                                                               
                                                                          
AbstractInvokable parentTask) throws Exception;
+
+       // 
--------------------------------------------------------------------------------------------
+       //                                    Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+
+       private Map<Integer, Collection<Match>> joinValues(
+                       Map<Integer, Collection<String>> leftMap,
+                       Map<Integer, Collection<String>> rightMap,
+                       OuterJoinType outerJoinType) {
+               Map<Integer, Collection<Match>> map = new HashMap<Integer, 
Collection<Match>>();
+
+               for (Integer key : leftMap.keySet()) {
+                       Collection<String> leftValues = leftMap.get(key);
+                       Collection<String> rightValues = rightMap.get(key);
+
+                       if (outerJoinType == OuterJoinType.RIGHT && rightValues 
== null) {
+                               continue;
+                       }
+
+                       if (!map.containsKey(key)) {
+                               map.put(key, new ArrayList<Match>());
+                       }
+
+                       Collection<Match> joinedValues = map.get(key);
+
+                       for (String leftValue : leftValues) {
+                               if (rightValues != null) {
+                                       for (String rightValue : rightValues) {
+                                               joinedValues.add(new 
Match(leftValue, rightValue));
+                                       }
+                               } else {
+                                       joinedValues.add(new Match(leftValue, 
null));
+                               }
+                       }
+               }
+
+               if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == 
OuterJoinType.FULL) {
+                       for (Integer key : rightMap.keySet()) {
+                               Collection<String> leftValues = 
leftMap.get(key);
+                               Collection<String> rightValues = 
rightMap.get(key);
+
+                               if (leftValues != null) {
+                                       continue;
+                               }
+
+                               if (!map.containsKey(key)) {
+                                       map.put(key, new ArrayList<Match>());
+                               }
+
+                               Collection<Match> joinedValues = map.get(key);
+
+                               for (String rightValue : rightValues) {
+                                       joinedValues.add(new Match(null, 
rightValue));
+                               }
+                       }
+               }
+
+               return map;
+       }
+
+
+       private Map<Integer, Collection<String>> 
collectData(MutableObjectIterator<Tuple2<Integer, String>> iter)
+                       throws Exception {
+               final Map<Integer, Collection<String>> map = new 
HashMap<Integer, Collection<String>>();
+               Tuple2<Integer, String> pair = new Tuple2<Integer, String>();
+
+               while ((pair = iter.next(pair)) != null) {
+                       final Integer key = pair.getField(0);
+
+                       if (!map.containsKey(key)) {
+                               map.put(key, new ArrayList<String>());
+                       }
+
+                       Collection<String> values = map.get(key);
+                       final String value = pair.getField(1);
+                       values.add(value);
+               }
+
+               return map;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
index 7fc3734..6548052 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -135,7 +135,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase {
                                        collectData(input2));
 
                        final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
-                                       new 
MatchRemovingMatcher(expectedMatchesMap);
+                                       new 
MatchRemovingJoiner(expectedMatchesMap);
 
                        final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
        
@@ -226,7 +226,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase {
                        input1 = new MergeIterator<Tuple2<Integer, 
String>>(inList1, comparator1.duplicate());
                        input2 = new MergeIterator<Tuple2<Integer, 
String>>(inList2, comparator2.duplicate());
                        
-                       final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new 
MatchRemovingMatcher(expectedMatchesMap);
+                       final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new 
MatchRemovingJoiner(expectedMatchesMap);
                        
                        final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
        

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
new file mode 100644
index 0000000..1205bc1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+public class NonReusingSortMergeOuterJoinIteratorITCase  extends 
AbstractSortMergeOuterJoinIteratorITCase {
+
+       @Override
+       protected <T1, T2> AbstractMergeOuterJoinIterator 
createOuterJoinIterator(OuterJoinType outerJoinType, MutableObjectIterator<T1> 
input1,
+                                                                               
                                                                          
MutableObjectIterator<T2> input2, TypeSerializer<T1> serializer1,
+                                                                               
                                                                          
TypeComparator<T1> comparator1, TypeSerializer<T2> serializer2, 
TypeComparator<T2> comparator2,
+                                                                               
                                                                          
TypePairComparator<T1, T2> pairComparator, MemoryManager memoryManager, 
IOManager ioManager,
+                                                                               
                                                                          int 
numMemoryPages, AbstractInvokable parentTask) throws Exception {
+               return new NonReusingMergeOuterJoinIterator(outerJoinType, 
input1, input2, serializer1, comparator1,
+                               serializer2, comparator2, pairComparator, 
memoryManager, ioManager, numMemoryPages, parentTask);
+       }
+
+       @Test
+       public void testFullOuterWithSample() throws Exception {
+               super.testFullOuterWithSample();
+       }
+
+       @Test
+       public void testLeftOuterWithSample() throws Exception {
+               super.testLeftOuterWithSample();
+       }
+
+       @Test
+       public void testRightOuterWithSample() throws Exception {
+               super.testRightOuterWithSample();
+       }
+
+       @Test
+       public void testRightSideEmpty() throws Exception {
+               super.testRightSideEmpty();
+       }
+
+       @Test
+       public void testLeftSideEmpty() throws Exception {
+               super.testLeftSideEmpty();
+       }
+
+       @Test
+       public void testFullOuterJoinWithHighNumberOfCommonKeys() {
+               testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.FULL, 
200, 500, 2048, 0.02f, 200, 500, 2048, 0.02f);
+       }
+
+       @Test
+       public void testLeftOuterJoinWithHighNumberOfCommonKeys() {
+               testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.LEFT, 
200, 10, 4096, 0.02f, 100, 4000, 2048, 0.02f);
+       }
+
+       @Test
+       public void testRightOuterJoinWithHighNumberOfCommonKeys() {
+               testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.RIGHT, 
100, 10, 2048, 0.02f, 200, 4000, 4096, 0.02f);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
index e4eec86..39316e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -135,7 +135,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase {
                                collectData(input2));
 
                        final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
-                                       new 
MatchRemovingMatcher(expectedMatchesMap);
+                                       new 
MatchRemovingJoiner(expectedMatchesMap);
 
                        final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
 
@@ -226,7 +226,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase {
                        input1 = new MergeIterator<Tuple2<Integer, 
String>>(inList1, comparator1.duplicate());
                        input2 = new MergeIterator<Tuple2<Integer, 
String>>(inList2, comparator2.duplicate());
                        
-                       final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new 
MatchRemovingMatcher(expectedMatchesMap);
+                       final FlatJoinFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new 
MatchRemovingJoiner(expectedMatchesMap);
                        
                        final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<Tuple2<Integer, String>>();
        

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
new file mode 100644
index 0000000..b4fbd80
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+public class ReusingSortMergeOuterJoinIteratorITCase extends 
AbstractSortMergeOuterJoinIteratorITCase {
+
+       @Override
+       protected <T1, T2> AbstractMergeOuterJoinIterator 
createOuterJoinIterator(OuterJoinType outerJoinType, MutableObjectIterator<T1> 
input1,
+                                                                               
                                                                          
MutableObjectIterator<T2> input2, TypeSerializer<T1> serializer1,
+                                                                               
                                                                          
TypeComparator<T1> comparator1, TypeSerializer<T2> serializer2, 
TypeComparator<T2> comparator2,
+                                                                               
                                                                          
TypePairComparator<T1, T2> pairComparator, MemoryManager memoryManager, 
IOManager ioManager,
+                                                                               
                                                                          int 
numMemoryPages, AbstractInvokable parentTask) throws Exception {
+               return new ReusingMergeOuterJoinIterator(outerJoinType, input1, 
input2, serializer1, comparator1,
+                               serializer2, comparator2, pairComparator, 
memoryManager, ioManager, numMemoryPages, parentTask);
+       }
+
+       @Test
+       public void testFullOuterWithSample() throws Exception {
+               super.testFullOuterWithSample();
+       }
+
+       @Test
+       public void testLeftOuterWithSample() throws Exception {
+               super.testLeftOuterWithSample();
+       }
+
+       @Test
+       public void testRightOuterWithSample() throws Exception {
+               super.testRightOuterWithSample();
+       }
+
+       @Test
+       public void testRightSideEmpty() throws Exception {
+               super.testRightSideEmpty();
+       }
+
+       @Test
+       public void testLeftSideEmpty() throws Exception {
+               super.testLeftSideEmpty();
+       }
+
+       @Test
+       public void testFullOuterJoinWithHighNumberOfCommonKeys() {
+               testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.FULL, 
200, 500, 2048, 0.02f, 200, 500, 2048, 0.02f);
+       }
+
+       @Test
+       public void testLeftOuterJoinWithHighNumberOfCommonKeys() {
+               testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.LEFT, 
200, 10, 4096, 0.02f, 100, 4000, 2048, 0.02f);
+       }
+
+       @Test
+       public void testRightOuterJoinWithHighNumberOfCommonKeys() {
+               testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.RIGHT, 
100, 10, 2048, 0.02f, 200, 4000, 4096, 0.02f);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
index 539d864..4ac9093 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.testutils;
 /**
  * Utility class for keeping track of matches in join operator tests.
  *
- * @see org.apache.flink.runtime.operators.testutils.MatchRemovingMatcher
+ * @see MatchRemovingJoiner
  */
 public class Match {
        private final String left;

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
new file mode 100644
index 0000000..e588d92
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Collection;
+import java.util.Map;
+
+
+public final class MatchRemovingJoiner implements 
FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>>
 {
+       private static final long serialVersionUID = 1L;
+
+       private final Map<Integer, Collection<Match>> toRemoveFrom;
+
+       public MatchRemovingJoiner(Map<Integer, Collection<Match>> map) {
+               this.toRemoveFrom = map;
+       }
+
+       @Override
+       public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> 
rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
+               final Integer key = rec1 != null ? (Integer) rec1.getField(0) : 
(Integer) rec2.getField(0);
+               final String value1 = rec1 != null ? (String) rec1.getField(1) 
: null;
+               final String value2 = rec2 != null ? (String) rec2.getField(1) 
: null;
+
+               Collection<Match> matches = this.toRemoveFrom.get(key);
+               if (matches == null) {
+                       Assert.fail("Match " + key + " - " + value1 + ":" + 
value2 + " is unexpected.");
+               }
+
+               boolean contained = matches.remove(new Match(value1, value2));
+               if (!contained) {
+                       Assert.fail("Produced match was not contained: " + key 
+ " - " + value1 + ":" + value2);
+               }
+               if (matches.isEmpty()) {
+                       this.toRemoveFrom.remove(key);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
deleted file mode 100644
index f69b4d7..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.testutils;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-import java.util.Collection;
-import java.util.Map;
-
-
-public final class MatchRemovingMatcher implements 
FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>>
 {
-       private static final long serialVersionUID = 1L;
-
-       private final Map<Integer, Collection<Match>> toRemoveFrom;
-
-       public MatchRemovingMatcher(Map<Integer, Collection<Match>> map) {
-               this.toRemoveFrom = map;
-       }
-
-       @Override
-       public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> 
rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
-               final Integer key = rec1 != null ? (Integer) rec1.getField(0) : 
(Integer) rec2.getField(0);
-               final String value1 = rec1 != null ? (String) rec1.getField(1) 
: null;
-               final String value2 = rec2 != null ? (String) rec2.getField(1) 
: null;
-
-               Collection<Match> matches = this.toRemoveFrom.get(key);
-               if (matches == null) {
-                       Assert.fail("Match " + key + " - " + value1 + ":" + 
value2 + " is unexpected.");
-               }
-
-               boolean contained = matches.remove(new Match(value1, value2));
-               if (!contained) {
-                       Assert.fail("Produced match was not contained: " + key 
+ " - " + value1 + ":" + value2);
-               }
-               if (matches.isEmpty()) {
-                       this.toRemoveFrom.remove(key);
-               }
-       }
-}

Reply via email to