[FLINK-5497] [tests] Remove duplicated tests for hash tables

This closes #3089


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53134594
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53134594
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53134594

Branch: refs/heads/master
Commit: 53134594644407d0a3cd691b0e93ae09ff6c8102
Parents: 9f544d8
Author: Alexey Diomin <diomi...@gmail.com>
Authored: Tue Jan 10 22:04:41 2017 +0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Feb 18 19:19:34 2017 +0100

----------------------------------------------------------------------
 .../NonReusingReOpenableHashTableITCase.java    | 421 +-----------------
 .../hash/ReOpenableHashTableITCase.java         | 222 ++++++++++
 .../hash/ReOpenableHashTableTestBase.java       | 193 +++++++++
 .../hash/ReusingReOpenableHashTableITCase.java  | 429 +------------------
 4 files changed, 429 insertions(+), 836 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 576cbd4..6b4e170 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -19,190 +19,34 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
 import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
 import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
-import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
 
-import static org.junit.Assert.fail;
+import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
+import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
 
 /**
  * Test specialized hash join that keeps the build side data (in memory and on 
hard disk)
  * This is used for iterative tasks.
  */
-public class NonReusingReOpenableHashTableITCase {
+public class NonReusingReOpenableHashTableITCase extends 
ReOpenableHashTableTestBase {
 
-       private static final int PAGE_SIZE = 8 * 1024;
-       private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
-
-       private static final long SEED1 = 561349061987311L;
-       private static final long SEED2 = 231434613412342L;
-
-       private static final int NUM_PROBES = 3; // number of reopenings of 
hash join
-
-       private final AbstractInvokable parentTask = new DummyInvokable();
-
-       private IOManager ioManager;
-       private MemoryManager memoryManager;
-
-       private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
-       private TypeComparator<Tuple2<Integer, String>> record1Comparator;
-       private TypeComparator<Tuple2<Integer, String>> record2Comparator;
-       private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> recordPairComparator;
-
-
-
-
-       private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-       private TypeSerializer<Tuple2<Integer, Integer>> 
recordBuildSideAccesssor;
-       private TypeSerializer<Tuple2<Integer, Integer>> 
recordProbeSideAccesssor;
-       private TypeComparator<Tuple2<Integer, Integer>> 
recordBuildSideComparator;
-       private TypeComparator<Tuple2<Integer, Integer>> 
recordProbeSideComparator;
-       private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>> pactRecordComparator;
-
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       @Before
-       public void beforeTest() {
-               this.recordSerializer = TestData.getIntStringTupleSerializer();
-
-               this.record1Comparator = TestData.getIntStringTupleComparator();
-               this.record2Comparator = TestData.getIntStringTupleComparator();
-               this.recordPairComparator = new 
GenericPairComparator(this.record1Comparator, this.record2Comparator);
-
-               this.recordBuildSideAccesssor = 
TestData.getIntIntTupleSerializer();
-               this.recordProbeSideAccesssor = 
TestData.getIntIntTupleSerializer();
-               this.recordBuildSideComparator = 
TestData.getIntIntTupleComparator();
-               this.recordProbeSideComparator = 
TestData.getIntIntTupleComparator();
-               this.pactRecordComparator = new 
GenericPairComparator(this.recordBuildSideComparator, 
this.recordProbeSideComparator);
-
-               this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, 
PAGE_SIZE, MemoryType.HEAP, true);
-               this.ioManager = new IOManagerAsync();
-       }
-
-       @After
-       public void afterTest() {
-               if (this.ioManager != null) {
-                       this.ioManager.shutdown();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
-                       this.ioManager = null;
-               }
-
-               if (this.memoryManager != null) {
-                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
-                               this.memoryManager.verifyEmpty());
-                       this.memoryManager.shutdown();
-                       this.memoryManager = null;
-               }
-       }
-
-
-       /**
-        * Test behavior with overflow buckets (Overflow buckets must be 
initialized correctly
-        * if the input is reopened again)
-        */
-       @Test
-       public void testOverflow() {
-
-               int buildSize = 1000;
-               int probeSize = 1000;
-               try {
-                       TupleGenerator bgen = new TupleGenerator(SEED1, 200, 
1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-                       TupleGenerator pgen = new TupleGenerator(SEED2, 0, 
1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
-                       final TestData.TupleGeneratorIterator buildInput = new 
TestData.TupleGeneratorIterator(bgen, buildSize);
-                       final TestData.TupleGeneratorIterator probeInput = new 
TestData.TupleGeneratorIterator(pgen, probeSize);
-                       doTest(buildInput,probeInput, bgen, pgen);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-
-       /**
-        * Verify proper operation if the build side is spilled to disk.
-        */
-       @Test
-       public void testDoubleProbeSpilling() {
-
-               int buildSize = 1000;
-               int probeSize = 1000;
-               try {
-                       TupleGenerator bgen = new TupleGenerator(SEED1, 0, 
1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       TupleGenerator pgen = new TupleGenerator(SEED2, 0, 
1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
-                       final TestData.TupleGeneratorIterator buildInput = new 
TestData.TupleGeneratorIterator(bgen, buildSize);
-                       final TestData.TupleGeneratorIterator probeInput = new 
TestData.TupleGeneratorIterator(pgen, probeSize);
-                       doTest(buildInput,probeInput, bgen, pgen);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-
-       /**
-        * This test case verifies that hybrid hash join is able to handle 
multiple probe phases
-        * when the build side fits completely into memory.
-        */
-       @Test
-       public void testDoubleProbeInMemory() {
-
-               int buildSize = 1000;
-               int probeSize = 1000;
-               try {
-                       TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
-                       final TestData.TupleGeneratorIterator buildInput = new 
TestData.TupleGeneratorIterator(bgen, buildSize);
-                       final TestData.TupleGeneratorIterator probeInput = new 
TestData.TupleGeneratorIterator(pgen, probeSize);
-
-                       doTest(buildInput,probeInput, bgen, pgen);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-
-       private void doTest(TestData.TupleGeneratorIterator buildInput, 
TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator 
pgen) throws Exception {
+       protected void doTest(TestData.TupleGeneratorIterator buildInput, 
TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator 
pgen) throws Exception {
                // collect expected data
-               final Map<Integer, Collection<TupleMatch>> 
expectedFirstMatchesMap = 
NonReusingHashJoinIteratorITCase.joinTuples(NonReusingHashJoinIteratorITCase.collectTupleData(buildInput),
 NonReusingHashJoinIteratorITCase.collectTupleData(probeInput));
+               final Map<Integer, Collection<TupleMatch>> 
expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), 
collectTupleData(probeInput));
 
                final List<Map<Integer, Collection<TupleMatch>>> 
expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
                final FlatJoinFunction[] nMatcher = new 
TupleMatchRemovingJoin[NUM_PROBES];
@@ -259,260 +103,5 @@ public class NonReusingReOpenableHashTableITCase {
                iterator.close();
        }
 
-       //
-       //
-       //      Tests taken from HahTableITCase!
-       //
-       //
-
-       private MutableObjectIterator<Tuple2<Integer, Integer>> 
getProbeInput(final int numKeys,
-                       final int probeValsPerKey, final int repeatedValue1, 
final int repeatedValue2) {
-               MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new 
UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
-               MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new 
TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
-               MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new 
TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
-               List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = 
new ArrayList<>();
-               probes.add(probe1);
-               probes.add(probe2);
-               probes.add(probe3);
-               return new UnionIterator<>(probes);
-       }
-
-       @Test
-       public void testSpillingHashJoinWithMassiveCollisions() throws 
IOException
-       {
-               // the following two values are known to have a hash-code 
collision on the initial level.
-               // we use them to make sure one partition grows 
over-proportionally large
-               final int REPEATED_VALUE_1 = 40559;
-               final int REPEATED_VALUE_2 = 92882;
-               final int REPEATED_VALUE_COUNT_BUILD = 200000;
-               final int REPEATED_VALUE_COUNT_PROBE = 5;
-
-               final int NUM_KEYS = 1000000;
-               final int BUILD_VALS_PER_KEY = 3;
-               final int PROBE_VALS_PER_KEY = 10;
-
-               // create a build input that gives 3 million pairs with 3 
values sharing the same key, plus 400k pairs with two colliding keys
-               MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new 
UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-               MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, 
REPEATED_VALUE_COUNT_BUILD);
-               MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, 
REPEATED_VALUE_COUNT_BUILD);
-               List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = 
new ArrayList<>();
-               builds.add(build1);
-               builds.add(build2);
-               builds.add(build3);
-               MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = 
new UnionIterator<>(builds);
-
-
-
-
-               // allocate the memory for the HashTable
-               List<MemorySegment> memSegments;
-               try {
-                       memSegments = 
this.memoryManager.allocatePages(MEM_OWNER, 896);
-               }
-               catch (MemoryAllocationException maex) {
-                       fail("Memory for the Join could not be provided.");
-                       return;
-               }
-
-               // create the map for validating the results
-               HashMap<Integer, Long> map = new HashMap<Integer, 
Long>(NUM_KEYS);
-
-               // 
----------------------------------------------------------------------------------------
-
-               final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
-                               this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor,
-                               this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
-                               memSegments, ioManager, true);
-
-               for (int probe = 0; probe < NUM_PROBES; probe++) {
-                       // create a probe input that gives 10 million pairs 
with 10 values sharing a key
-                       MutableObjectIterator<Tuple2<Integer, Integer>> 
probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, 
REPEATED_VALUE_2);
-                       if(probe == 0) {
-                               join.open(buildInput, probeInput);
-                       } else {
-                               join.reopenProbe(probeInput);
-                       }
-
-                       Tuple2<Integer, Integer> record;
-                       final Tuple2<Integer, Integer> recordReuse = new 
Tuple2<>();
-
-                       while (join.nextRecord()) {
-                               long numBuildValues = 0;
-
-                               final Tuple2<Integer, Integer> probeRec = 
join.getCurrentProbeRecord();
-                               Integer key = probeRec.f0;
-                               
-                               MutableObjectIterator<Tuple2<Integer, Integer>> 
buildSide = join.getBuildSideIterator();
-                               if ((record = buildSide.next(recordReuse)) != 
null) {
-                                       numBuildValues = 1;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0);
-                               }
-                               else {
-                                       fail("No build side values found for a 
probe key.");
-                               }
-                               while ((record = buildSide.next(record)) != 
null) {
-                                       numBuildValues++;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0);
-                               }
-
-                               Long contained = map.get(key);
-                               if (contained == null) {
-                                       contained = numBuildValues;
-                               }
-                               else {
-                                       contained = contained + numBuildValues;
-                               }
-
-                               map.put(key, contained);
-                       }
-               }
-
-               join.close();
-
-               Assert.assertEquals("Wrong number of keys", NUM_KEYS, 
map.size());
-               for (Entry<Integer, Long> entry : map.entrySet()) {
-                       long val = entry.getValue();
-                       int key = entry.getKey();
-
-                       if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) 
{
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
-                                                       (PROBE_VALS_PER_KEY + 
REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) 
* NUM_PROBES, val);
-                       } else {
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
-                                                       PROBE_VALS_PER_KEY * 
BUILD_VALS_PER_KEY * NUM_PROBES, val);
-                       }
-               }
-
-
-               // 
----------------------------------------------------------------------------------------
-
-               this.memoryManager.release(join.getFreedMemory());
-       }
-
-       /*
-        * This test is basically identical to the 
"testSpillingHashJoinWithMassiveCollisions" test, only that the number
-        * of repeated values (causing bucket collisions) are large enough to 
make sure that their target partition no longer
-        * fits into memory by itself and needs to be repartitioned in the 
recursion again.
-        */
-       @Test
-       public void testSpillingHashJoinWithTwoRecursions() throws IOException
-       {
-               // the following two values are known to have a hash-code 
collision on the first recursion level.
-               // we use them to make sure one partition grows 
over-proportionally large
-               final int REPEATED_VALUE_1 = 40559;
-               final int REPEATED_VALUE_2 = 92882;
-               final int REPEATED_VALUE_COUNT_BUILD = 200000;
-               final int REPEATED_VALUE_COUNT_PROBE = 5;
-
-               final int NUM_KEYS = 1000000;
-               final int BUILD_VALS_PER_KEY = 3;
-               final int PROBE_VALS_PER_KEY = 10;
-
-               // create a build input that gives 3 million pairs with 3 
values sharing the same key, plus 400k pairs with two colliding keys
-               MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new 
UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-               MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, 
REPEATED_VALUE_COUNT_BUILD);
-               MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, 
REPEATED_VALUE_COUNT_BUILD);
-               List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = 
new ArrayList<>();
-               builds.add(build1);
-               builds.add(build2);
-               builds.add(build3);
-               MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = 
new UnionIterator<>(builds);
-
-
-               // allocate the memory for the HashTable
-               List<MemorySegment> memSegments;
-               try {
-                       memSegments = 
this.memoryManager.allocatePages(MEM_OWNER, 896);
-               }
-               catch (MemoryAllocationException maex) {
-                       fail("Memory for the Join could not be provided.");
-                       return;
-               }
-
-               // create the map for validating the results
-               HashMap<Integer, Long> map = new HashMap<Integer, 
Long>(NUM_KEYS);
-
-               // 
----------------------------------------------------------------------------------------
-
-               final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
-                               this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor,
-                               this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
-                               memSegments, ioManager, true);
-               
-               for (int probe = 0; probe < NUM_PROBES; probe++) {
-                       // create a probe input that gives 10 million pairs 
with 10 values sharing a key
-                       MutableObjectIterator<Tuple2<Integer, Integer>> 
probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, 
REPEATED_VALUE_2);
-                       if (probe == 0) {
-                               join.open(buildInput, probeInput);
-                       } else {
-                               join.reopenProbe(probeInput);
-                       }
-                       Tuple2<Integer, Integer> record;
-                       final Tuple2<Integer, Integer> recordReuse = new 
Tuple2<>();
-
-                       while (join.nextRecord()) {
-                               long numBuildValues = 0;
-
-                               final Tuple2<Integer, Integer> probeRec = 
join.getCurrentProbeRecord();
-                               Integer key = probeRec.f0;
-                               
-                               MutableObjectIterator<Tuple2<Integer, Integer>> 
buildSide = join.getBuildSideIterator();
-                               if ((record = buildSide.next(recordReuse)) != 
null) {
-                                       numBuildValues = 1;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0);
-                               }
-                               else {
-                                       fail("No build side values found for a 
probe key.");
-                               }
-                               while ((record = buildSide.next(recordReuse)) 
!= null) {
-                                       numBuildValues++;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0);
-                               }
-
-                               Long contained = map.get(key);
-                               if (contained == null) {
-                                       contained = numBuildValues;
-                               }
-                               else {
-                                       contained = contained + numBuildValues;
-                               }
-
-                               map.put(key, contained);
-                       }
-               }
-
-               join.close();
-               Assert.assertEquals("Wrong number of keys", NUM_KEYS, 
map.size());
-               for (Entry<Integer, Long> entry : map.entrySet()) {
-                       long val = entry.getValue();
-                       int key = entry.getKey();
-
-                       if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) 
{
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
-                                                       (PROBE_VALS_PER_KEY + 
REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) 
* NUM_PROBES, val);
-                       } else {
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
-                                                       PROBE_VALS_PER_KEY * 
BUILD_VALS_PER_KEY * NUM_PROBES, val);
-                       }
-               }
-
-
-               // 
----------------------------------------------------------------------------------------
-
-               this.memoryManager.release(join.getFreedMemory());
-       }
-
 
-       static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, 
Collection<TupleMatch>> expectedSecondMatchesMap) {
-               Map<Integer, Collection<TupleMatch>> copy = new 
HashMap<>(expectedSecondMatchesMap.size());
-               for(Entry<Integer, Collection<TupleMatch>> entry : 
expectedSecondMatchesMap.entrySet()) {
-                       List<TupleMatch> matches = new 
ArrayList<TupleMatch>(entry.getValue().size());
-                       for(TupleMatch m : entry.getValue()) {
-                               matches.add(m);
-                       }
-                       copy.put(entry.getKey(), matches);
-               }
-               return copy;
-       }
-       
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
new file mode 100644
index 0000000..f667c87
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+public class ReOpenableHashTableITCase {
+
+       private static final int PAGE_SIZE = 8 * 1024;
+       private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+
+       private static final int NUM_PROBES = 3; // number of reopenings of 
hash join
+
+       private IOManager ioManager;
+       private MemoryManager memoryManager;
+
+       private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+       private TypeSerializer<Tuple2<Integer, Integer>> 
recordBuildSideAccesssor;
+       private TypeSerializer<Tuple2<Integer, Integer>> 
recordProbeSideAccesssor;
+       private TypeComparator<Tuple2<Integer, Integer>> 
recordBuildSideComparator;
+       private TypeComparator<Tuple2<Integer, Integer>> 
recordProbeSideComparator;
+       private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>> pactRecordComparator;
+
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       @Before
+       public void beforeTest() {
+               this.recordBuildSideAccesssor = 
TestData.getIntIntTupleSerializer();
+               this.recordProbeSideAccesssor = 
TestData.getIntIntTupleSerializer();
+               this.recordBuildSideComparator = 
TestData.getIntIntTupleComparator();
+               this.recordProbeSideComparator = 
TestData.getIntIntTupleComparator();
+               this.pactRecordComparator = new 
GenericPairComparator(this.recordBuildSideComparator, 
this.recordProbeSideComparator);
+
+               this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, 
PAGE_SIZE, MemoryType.HEAP, true);
+               this.ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               if (this.ioManager != null) {
+                       this.ioManager.shutdown();
+                       if (!this.ioManager.isProperlyShutDown()) {
+                               Assert.fail("I/O manager failed to properly 
shut down.");
+                       }
+                       this.ioManager = null;
+               }
+
+               if (this.memoryManager != null) {
+                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
+                               this.memoryManager.verifyEmpty());
+                       this.memoryManager.shutdown();
+                       this.memoryManager = null;
+               }
+       }
+
+       private MutableObjectIterator<Tuple2<Integer, Integer>> 
getProbeInput(final int numKeys,
+                                                                               
                                                                  final int 
probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
+               MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new 
UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
+               MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new 
TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
+               MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new 
TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
+               List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = 
new ArrayList<>();
+               probes.add(probe1);
+               probes.add(probe2);
+               probes.add(probe3);
+               return new UnionIterator<>(probes);
+       }
+
+       @Test
+       public void testSpillingHashJoinWithMassiveCollisions() throws 
IOException
+       {
+               // the following two values are known to have a hash-code 
collision on the initial level.
+               // we use them to make sure one partition grows 
over-proportionally large
+               final int REPEATED_VALUE_1 = 40559;
+               final int REPEATED_VALUE_2 = 92882;
+               final int REPEATED_VALUE_COUNT_BUILD = 200000;
+               final int REPEATED_VALUE_COUNT_PROBE = 5;
+
+               final int NUM_KEYS = 1000000;
+               final int BUILD_VALS_PER_KEY = 3;
+               final int PROBE_VALS_PER_KEY = 10;
+
+               // create a build input that gives 3 million pairs with 3 
values sharing the same key, plus 400k pairs with two colliding keys
+               MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new 
UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+               MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, 
REPEATED_VALUE_COUNT_BUILD);
+               MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, 
REPEATED_VALUE_COUNT_BUILD);
+               List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = 
new ArrayList<>();
+               builds.add(build1);
+               builds.add(build2);
+               builds.add(build3);
+               MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = 
new UnionIterator<>(builds);
+
+
+
+
+               // allocate the memory for the HashTable
+               List<MemorySegment> memSegments;
+               try {
+                       memSegments = 
this.memoryManager.allocatePages(MEM_OWNER, 896);
+               }
+               catch (MemoryAllocationException maex) {
+                       fail("Memory for the Join could not be provided.");
+                       return;
+               }
+
+               // create the map for validating the results
+               HashMap<Integer, Long> map = new HashMap<Integer, 
Long>(NUM_KEYS);
+
+               // 
----------------------------------------------------------------------------------------
+
+               final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
+                       this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor,
+                       this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
+                       memSegments, ioManager, true);
+
+               for (int probe = 0; probe < NUM_PROBES; probe++) {
+                       // create a probe input that gives 10 million pairs 
with 10 values sharing a key
+                       MutableObjectIterator<Tuple2<Integer, Integer>> 
probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, 
REPEATED_VALUE_2);
+                       if(probe == 0) {
+                               join.open(buildInput, probeInput);
+                       } else {
+                               join.reopenProbe(probeInput);
+                       }
+
+                       Tuple2<Integer, Integer> record;
+                       final Tuple2<Integer, Integer> recordReuse = new 
Tuple2<>();
+
+                       while (join.nextRecord()) {
+                               long numBuildValues = 0;
+
+                               final Tuple2<Integer, Integer> probeRec = 
join.getCurrentProbeRecord();
+                               Integer key = probeRec.f0;
+
+                               MutableObjectIterator<Tuple2<Integer, Integer>> 
buildSide = join.getBuildSideIterator();
+                               if ((record = buildSide.next(recordReuse)) != 
null) {
+                                       numBuildValues = 1;
+                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0);
+                               }
+                               else {
+                                       fail("No build side values found for a 
probe key.");
+                               }
+                               while ((record = buildSide.next(recordReuse)) 
!= null) {
+                                       numBuildValues++;
+                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0);
+                               }
+
+                               Long contained = map.get(key);
+                               if (contained == null) {
+                                       contained = numBuildValues;
+                               }
+                               else {
+                                       contained = contained + numBuildValues;
+                               }
+
+                               map.put(key, contained);
+                       }
+               }
+
+               join.close();
+               Assert.assertEquals("Wrong number of keys", NUM_KEYS, 
map.size());
+               for (Map.Entry<Integer, Long> entry : map.entrySet()) {
+                       long val = entry.getValue();
+                       int key = entry.getKey();
+
+                       if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) 
{
+                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
+                                       (PROBE_VALS_PER_KEY + 
REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) 
* NUM_PROBES, val);
+                       } else {
+                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key,
+                                       PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY 
* NUM_PROBES, val);
+                       }
+               }
+
+
+               // 
----------------------------------------------------------------------------------------
+
+               this.memoryManager.release(join.getFreedMemory());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
new file mode 100644
index 0000000..c1b87b0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGeneratorIterator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import java.util.*;
+
+public abstract class ReOpenableHashTableTestBase {
+
+       protected static final int PAGE_SIZE = 8 * 1024;
+       protected static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 
Pages.
+
+       protected static final long SEED1 = 561349061987311L;
+       protected static final long SEED2 = 231434613412342L;
+
+       protected static final int NUM_PROBES = 3; // number of reopenings of 
hash join
+
+       protected final AbstractInvokable parentTask = new DummyInvokable();
+
+       protected IOManager ioManager;
+       protected MemoryManager memoryManager;
+
+       protected TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+       protected TypeComparator<Tuple2<Integer, String>> record1Comparator;
+       protected TypeComparator<Tuple2<Integer, String>> record2Comparator;
+       protected TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> recordPairComparator;
+
+       protected TypeSerializer<Tuple2<Integer, Integer>> 
recordBuildSideAccesssor;
+       protected TypeSerializer<Tuple2<Integer, Integer>> 
recordProbeSideAccesssor;
+       protected TypeComparator<Tuple2<Integer, Integer>> 
recordBuildSideComparator;
+       protected TypeComparator<Tuple2<Integer, Integer>> 
recordProbeSideComparator;
+       protected TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>> pactRecordComparator;
+
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       @Before
+       public void beforeTest() {
+               this.recordSerializer = TestData.getIntStringTupleSerializer();
+
+               this.record1Comparator = TestData.getIntStringTupleComparator();
+               this.record2Comparator = TestData.getIntStringTupleComparator();
+               this.recordPairComparator = new 
GenericPairComparator(this.record1Comparator, this.record2Comparator);
+
+               this.recordBuildSideAccesssor = 
TestData.getIntIntTupleSerializer();
+               this.recordProbeSideAccesssor = 
TestData.getIntIntTupleSerializer();
+               this.recordBuildSideComparator = 
TestData.getIntIntTupleComparator();
+               this.recordProbeSideComparator = 
TestData.getIntIntTupleComparator();
+               this.pactRecordComparator = new 
GenericPairComparator(this.recordBuildSideComparator, 
this.recordProbeSideComparator);
+
+               this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, 
PAGE_SIZE, MemoryType.HEAP, true);
+               this.ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               if (this.ioManager != null) {
+                       this.ioManager.shutdown();
+                       if (!this.ioManager.isProperlyShutDown()) {
+                               Assert.fail("I/O manager failed to properly 
shut down.");
+                       }
+                       this.ioManager = null;
+               }
+
+               if (this.memoryManager != null) {
+                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
+                               this.memoryManager.verifyEmpty());
+                       this.memoryManager.shutdown();
+                       this.memoryManager = null;
+               }
+       }
+
+       /**
+        * Test behavior with overflow buckets (Overflow buckets must be 
initialized correctly
+        * if the input is reopened again)
+        */
+       @Test
+       public void testOverflow() {
+
+               int buildSize = 1000;
+               int probeSize = 1000;
+               try {
+                       TupleGenerator bgen = new TupleGenerator(SEED1, 200, 
1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+                       TupleGenerator pgen = new TupleGenerator(SEED2, 0, 
1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+                       final TupleGeneratorIterator buildInput = new 
TupleGeneratorIterator(bgen, buildSize);
+                       final TupleGeneratorIterator probeInput = new 
TupleGeneratorIterator(pgen, probeSize);
+                       doTest(buildInput,probeInput, bgen, pgen);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       /**
+        * Verify proper operation if the build side is spilled to disk.
+        */
+       @Test
+       public void testDoubleProbeSpilling() {
+
+               int buildSize = 1000;
+               int probeSize = 1000;
+               try {
+                       TupleGenerator bgen = new TupleGenerator(SEED1, 0, 
1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+                       TupleGenerator pgen = new TupleGenerator(SEED2, 0, 
1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+                       final TupleGeneratorIterator buildInput = new 
TupleGeneratorIterator(bgen, buildSize);
+                       final TupleGeneratorIterator probeInput = new 
TupleGeneratorIterator(pgen, probeSize);
+                       doTest(buildInput,probeInput, bgen, pgen);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       /**
+        * This test case verifies that hybrid hash join is able to handle 
multiple probe phases
+        * when the build side fits completely into memory.
+        */
+       @Test
+       public void testDoubleProbeInMemory() {
+
+               int buildSize = 1000;
+               int probeSize = 1000;
+               try {
+                       TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
+                       TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, 
KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+                       final TupleGeneratorIterator buildInput = new 
TupleGeneratorIterator(bgen, buildSize);
+                       final TupleGeneratorIterator probeInput = new 
TupleGeneratorIterator(pgen, probeSize);
+
+                       doTest(buildInput,probeInput, bgen, pgen);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
+               }
+       }
+
+       abstract protected void doTest(TupleGeneratorIterator buildInput, 
TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, 
TestData.TupleGenerator pgen) throws Exception;
+
+       static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, 
Collection<TupleMatch>> expectedSecondMatchesMap) {
+               Map<Integer, Collection<TupleMatch>> copy = new 
HashMap<>(expectedSecondMatchesMap.size());
+               for(Map.Entry<Integer, Collection<TupleMatch>> entry : 
expectedSecondMatchesMap.entrySet()) {
+                       List<TupleMatch> matches = new 
ArrayList<TupleMatch>(entry.getValue().size());
+                       for(TupleMatch m : entry.getValue()) {
+                               matches.add(m);
+                       }
+                       copy.put(entry.getKey(), matches);
+               }
+               return copy;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index 6afde16..af3a894 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -19,44 +19,21 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import static org.junit.Assert.fail;
+import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
-import 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
 
 import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
 import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
@@ -65,144 +42,9 @@ import static 
org.apache.flink.runtime.operators.hash.NonReusingHashJoinIterator
  * Test specialized hash join that keeps the build side data (in memory and on 
hard disk)
  * This is used for iterative tasks.
  */
-public class ReusingReOpenableHashTableITCase {
-       
-       private static final int PAGE_SIZE = 8 * 1024;
-       private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+public class ReusingReOpenableHashTableITCase extends 
ReOpenableHashTableTestBase {
 
-       private static final long SEED1 = 561349061987311L;
-       private static final long SEED2 = 231434613412342L;
-       
-       private static final int NUM_PROBES = 3; // number of reopenings of 
hash join
-       
-       private final AbstractInvokable parentTask = new DummyInvokable();
-
-       private IOManager ioManager;
-       private MemoryManager memoryManager;
-       
-       private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
-       private TypeComparator<Tuple2<Integer, String>> record1Comparator;
-       private TypeComparator<Tuple2<Integer, String>> record2Comparator;
-       private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> recordPairComparator;
-       
-       
-       
-       
-       private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-       private TypeSerializer<Tuple2<Integer, Integer>> 
recordBuildSideAccesssor;
-       private TypeSerializer<Tuple2<Integer, Integer>> 
recordProbeSideAccesssor;
-       private TypeComparator<Tuple2<Integer, Integer>> 
recordBuildSideComparator;
-       private TypeComparator<Tuple2<Integer, Integer>> 
recordProbeSideComparator;
-       private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>> pactRecordComparator;
-
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       @Before
-       public void beforeTest() {
-               this.recordSerializer = TestData.getIntStringTupleSerializer();
-               
-               this.record1Comparator = TestData.getIntStringTupleComparator();
-               this.record2Comparator = TestData.getIntStringTupleComparator();
-               this.recordPairComparator = new 
GenericPairComparator(this.record1Comparator, this.record2Comparator);
-               
-               this.recordBuildSideAccesssor = 
TestData.getIntIntTupleSerializer();
-               this.recordProbeSideAccesssor = 
TestData.getIntIntTupleSerializer();
-               this.recordBuildSideComparator = 
TestData.getIntIntTupleComparator();
-               this.recordProbeSideComparator = 
TestData.getIntIntTupleComparator();
-               this.pactRecordComparator = new 
GenericPairComparator(this.recordBuildSideComparator, 
this.recordProbeSideComparator);
-               
-               this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, 
PAGE_SIZE, MemoryType.HEAP, true);
-               this.ioManager = new IOManagerAsync();
-       }
-
-       @After
-       public void afterTest() {
-               if (this.ioManager != null) {
-                       this.ioManager.shutdown();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
-                       this.ioManager = null;
-               }
-               
-               if (this.memoryManager != null) {
-                       Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
-                               this.memoryManager.verifyEmpty());
-                       this.memoryManager.shutdown();
-                       this.memoryManager = null;
-               }
-       }
-       
-       
-       /**
-        * Test behavior with overflow buckets (Overflow buckets must be 
initialized correctly 
-        * if the input is reopened again)
-        */
-       @Test
-       public void testOverflow() {
-               
-               int buildSize = 1000;
-               int probeSize = 1000;
-               try {
-                       TestData.TupleGenerator bgen = new 
TestData.TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-                       TestData.TupleGenerator pgen = new 
TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator buildInput = new 
TestData.TupleGeneratorIterator(bgen, buildSize);
-                       final TestData.TupleGeneratorIterator probeInput = new 
TestData.TupleGeneratorIterator(pgen, probeSize);
-                       doTest(buildInput,probeInput, bgen, pgen);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       /**
-        * Verify proper operation if the build side is spilled to disk.
-        */
-       @Test
-       public void testDoubleProbeSpilling() {
-               
-               int buildSize = 1000;
-               int probeSize = 1000;
-               try {
-                       TestData.TupleGenerator bgen = new 
TestData.TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       TestData.TupleGenerator pgen = new 
TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator buildInput = new 
TestData.TupleGeneratorIterator(bgen, buildSize);
-                       final TestData.TupleGeneratorIterator probeInput = new 
TestData.TupleGeneratorIterator(pgen, probeSize);
-                       doTest(buildInput,probeInput, bgen, pgen);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       /**
-        * This test case verifies that hybrid hash join is able to handle 
multiple probe phases
-        * when the build side fits completely into memory.
-        */
-       @Test
-       public void testDoubleProbeInMemory() {
-               
-               int buildSize = 1000;
-               int probeSize = 1000;
-               try {
-                       TestData.TupleGenerator bgen = new 
TestData.TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       TestData.TupleGenerator pgen = new 
TestData.TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-                       
-                       final TestData.TupleGeneratorIterator buildInput = new 
TestData.TupleGeneratorIterator(bgen, buildSize);
-                       final TestData.TupleGeneratorIterator probeInput = new 
TestData.TupleGeneratorIterator(pgen, probeSize);
-                       
-                       doTest(buildInput,probeInput, bgen, pgen);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An exception occurred during the test: " + 
e.getMessage());
-               }
-       }
-       
-       private void doTest(TestData.TupleGeneratorIterator buildInput, 
TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, 
TestData.TupleGenerator pgen) throws Exception {
+       protected void doTest(TestData.TupleGeneratorIterator buildInput, 
TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator 
pgen) throws Exception {
                // collect expected data
                final Map<Integer, Collection<TupleMatch>> 
expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), 
collectTupleData(probeInput));
                
@@ -260,257 +102,4 @@ public class ReusingReOpenableHashTableITCase {
                
                iterator.close();
        }
-       
-       //
-       //
-       //      Tests taken from HahTableITCase!
-       //
-       //
-       
-       private MutableObjectIterator<Tuple2<Integer, Integer>> 
getProbeInput(final int numKeys,
-                       final int probeValsPerKey, final int repeatedValue1, 
final int repeatedValue2) {
-               MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new 
UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
-               MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new 
TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
-               MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new 
TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
-               List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = 
new ArrayList<>();
-               probes.add(probe1);
-               probes.add(probe2);
-               probes.add(probe3);
-               return new UnionIterator<>(probes);
-       }
-       
-       @Test
-       public void testSpillingHashJoinWithMassiveCollisions() throws 
IOException {
-               // the following two values are known to have a hash-code 
collision on the initial level.
-               // we use them to make sure one partition grows 
over-proportionally large
-               final int REPEATED_VALUE_1 = 40559;
-               final int REPEATED_VALUE_2 = 92882;
-               final int REPEATED_VALUE_COUNT_BUILD = 200000;
-               final int REPEATED_VALUE_COUNT_PROBE = 5;
-               
-               final int NUM_KEYS = 1000000;
-               final int BUILD_VALS_PER_KEY = 3;
-               final int PROBE_VALS_PER_KEY = 10;
-               
-               // create a build input that gives 3 million pairs with 3 
values sharing the same key, plus 400k pairs with two colliding keys
-               MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new 
UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-               MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, 
REPEATED_VALUE_COUNT_BUILD);
-               MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, 
REPEATED_VALUE_COUNT_BUILD);
-               List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = 
new ArrayList<>();
-               builds.add(build1);
-               builds.add(build2);
-               builds.add(build3);
-               MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = 
new UnionIterator<>(builds);
-
-               // allocate the memory for the HashTable
-               List<MemorySegment> memSegments;
-               try {
-                       memSegments = 
this.memoryManager.allocatePages(MEM_OWNER, 896);
-               }
-               catch (MemoryAllocationException maex) {
-                       fail("Memory for the Join could not be provided.");
-                       return;
-               }
-               
-               // create the map for validating the results
-               HashMap<Integer, Long> map = new HashMap<Integer, 
Long>(NUM_KEYS);
-               
-               // 
----------------------------------------------------------------------------------------
-               
-               final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
-                               this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor, 
-                               this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
-                               memSegments, ioManager, true);
-               
-               for(int probe = 0; probe < NUM_PROBES; probe++) {
-                       // create a probe input that gives 10 million pairs 
with 10 values sharing a key
-                       MutableObjectIterator<Tuple2<Integer, Integer>> 
probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, 
REPEATED_VALUE_2);
-                       if(probe == 0) {
-                               join.open(buildInput, probeInput);
-                       } else {
-                               join.reopenProbe(probeInput);
-                       }
-               
-                       Tuple2<Integer, Integer> record;
-                       final Tuple2<Integer, Integer> recordReuse = new 
Tuple2<>();
-
-                       while (join.nextRecord()) {
-                               long numBuildValues = 0;
-               
-                               final Tuple2<Integer, Integer> probeRec = 
join.getCurrentProbeRecord();
-                               Integer key = probeRec.f0;
-                               
-                               MutableObjectIterator<Tuple2<Integer, Integer>> 
buildSide = join.getBuildSideIterator();
-                               if ((record = buildSide.next(recordReuse)) != 
null) {
-                                       numBuildValues = 1;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0); 
-                               }
-                               else {
-                                       fail("No build side values found for a 
probe key.");
-                               }
-                               while ((record = buildSide.next(record)) != 
null) {
-                                       numBuildValues++;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0);
-                               }
-                               
-                               Long contained = map.get(key);
-                               if (contained == null) {
-                                       contained = numBuildValues;
-                               }
-                               else {
-                                       contained = contained + numBuildValues;
-                               }
-                               
-                               map.put(key, contained);
-                       }
-               }
-               
-               join.close();
-               
-               Assert.assertEquals("Wrong number of keys", NUM_KEYS, 
map.size());
-               for (Map.Entry<Integer, Long> entry : map.entrySet()) {
-                       long val = entry.getValue();
-                       int key = entry.getKey();
-       
-                       if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) 
{
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key, 
-                                                       (PROBE_VALS_PER_KEY + 
REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) 
* NUM_PROBES, val);
-                       } else {
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key, 
-                                                       PROBE_VALS_PER_KEY * 
BUILD_VALS_PER_KEY * NUM_PROBES, val);
-                       }
-               }
-               
-               
-               // 
----------------------------------------------------------------------------------------
-               
-               this.memoryManager.release(join.getFreedMemory());
-       }
-       
-       /*
-        * This test is basically identical to the 
"testSpillingHashJoinWithMassiveCollisions" test, only that the number
-        * of repeated values (causing bucket collisions) are large enough to 
make sure that their target partition no longer
-        * fits into memory by itself and needs to be repartitioned in the 
recursion again.
-        */
-       @Test
-       public void testSpillingHashJoinWithTwoRecursions() throws IOException
-       {
-               // the following two values are known to have a hash-code 
collision on the first recursion level.
-               // we use them to make sure one partition grows 
over-proportionally large
-               final int REPEATED_VALUE_1 = 40559;
-               final int REPEATED_VALUE_2 = 92882;
-               final int REPEATED_VALUE_COUNT_BUILD = 200000;
-               final int REPEATED_VALUE_COUNT_PROBE = 5;
-               
-               final int NUM_KEYS = 1000000;
-               final int BUILD_VALS_PER_KEY = 3;
-               final int PROBE_VALS_PER_KEY = 10;
-               
-               // create a build input that gives 3 million pairs with 3 
values sharing the same key, plus 400k pairs with two colliding keys
-               MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new 
UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-               MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, 
REPEATED_VALUE_COUNT_BUILD);
-               MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new 
TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, 
REPEATED_VALUE_COUNT_BUILD);
-               List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = 
new ArrayList<>();
-               builds.add(build1);
-               builds.add(build2);
-               builds.add(build3);
-               MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = 
new UnionIterator<>(builds);
-       
-
-               // allocate the memory for the HashTable
-               List<MemorySegment> memSegments;
-               try {
-                       memSegments = 
this.memoryManager.allocatePages(MEM_OWNER, 896);
-               }
-               catch (MemoryAllocationException maex) {
-                       fail("Memory for the Join could not be provided.");
-                       return;
-               }
-               
-               // create the map for validating the results
-               HashMap<Integer, Long> map = new HashMap<Integer, 
Long>(NUM_KEYS);
-               
-               // 
----------------------------------------------------------------------------------------
-               
-               final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
-                               this.recordBuildSideAccesssor, 
this.recordProbeSideAccesssor, 
-                               this.recordBuildSideComparator, 
this.recordProbeSideComparator, this.pactRecordComparator,
-                               memSegments, ioManager, true);
-               
-               for (int probe = 0; probe < NUM_PROBES; probe++) {
-                       // create a probe input that gives 10 million pairs 
with 10 values sharing a key
-                       MutableObjectIterator<Tuple2<Integer, Integer>> 
probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, 
REPEATED_VALUE_2);
-                       if(probe == 0) {
-                               join.open(buildInput, probeInput);
-                       } else {
-                               join.reopenProbe(probeInput);
-                       }
-                       Tuple2<Integer, Integer> record;
-                       final Tuple2<Integer, Integer> recordReuse = new 
Tuple2<>();
-
-                       while (join.nextRecord())
-                       {       
-                               long numBuildValues = 0;
-                               
-                               final Tuple2<Integer, Integer> probeRec = 
join.getCurrentProbeRecord();
-                               Integer key = probeRec.f0;
-                               
-                               MutableObjectIterator<Tuple2<Integer, Integer>> 
buildSide = join.getBuildSideIterator();
-                               if ((record = buildSide.next(recordReuse)) != 
null) {
-                                       numBuildValues = 1;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0); 
-                               }
-                               else {
-                                       fail("No build side values found for a 
probe key.");
-                               }
-                               while ((record = buildSide.next(recordReuse)) 
!= null) {
-                                       numBuildValues++;
-                                       Assert.assertEquals("Probe-side key was 
different than build-side key.", key, record.f0);
-                               }
-                               
-                               Long contained = map.get(key);
-                               if (contained == null) {
-                                       contained = numBuildValues;
-                               }
-                               else {
-                                       contained = contained + numBuildValues;
-                               }
-                               
-                               map.put(key, contained);
-                       }
-               }
-               
-               join.close();
-               Assert.assertEquals("Wrong number of keys", NUM_KEYS, 
map.size());
-               for (Map.Entry<Integer, Long> entry : map.entrySet()) {
-                       long val = entry.getValue();
-                       int key = entry.getKey();
-       
-                       if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) 
{
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key, 
-                                                       (PROBE_VALS_PER_KEY + 
REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) 
* NUM_PROBES, val);
-                       } else {
-                               Assert.assertEquals("Wrong number of values in 
per-key cross product for key " + key, 
-                                                       PROBE_VALS_PER_KEY * 
BUILD_VALS_PER_KEY * NUM_PROBES, val);
-                       }
-               }
-               
-               
-               // 
----------------------------------------------------------------------------------------
-               
-               this.memoryManager.release(join.getFreedMemory());
-       }
-       
-       
-       static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, 
Collection<TupleMatch>> expectedSecondMatchesMap) {
-               Map<Integer, Collection<TupleMatch>> copy = new 
HashMap<>(expectedSecondMatchesMap.size());
-               for(Map.Entry<Integer, Collection<TupleMatch>> entry : 
expectedSecondMatchesMap.entrySet()) {
-                       List<TupleMatch> matches = new 
ArrayList<TupleMatch>(entry.getValue().size());
-                       for(TupleMatch m : entry.getValue()) {
-                               matches.add(m);
-                       }
-                       copy.put(entry.getKey(), matches);
-               }
-               return copy;
-       }
 }

Reply via email to