[FLINK-2853] Port MutableHashTablePerformanceBenchmark to JMH.

This closes #1267


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5ee55bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5ee55bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5ee55bd

Branch: refs/heads/master
Commit: e5ee55bdaf0dcd6e3dc38b52964a8570b22dd671
Parents: 75a5257
Author: gallenvara <gallenv...@126.com>
Authored: Fri Oct 16 16:48:30 2015 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Mon Oct 26 20:05:03 2015 +0100

----------------------------------------------------------------------
 flink-benchmark/pom.xml                         |  11 +-
 .../MutableHashTablePerformanceBenchmark.java   | 359 +++++++++++++++++++
 .../MutableHashTablePerformanceBenchmark.java   | 262 --------------
 3 files changed, 368 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5ee55bd/flink-benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/flink-benchmark/pom.xml b/flink-benchmark/pom.xml
index d6ba1d7..b22e0d1 100644
--- a/flink-benchmark/pom.xml
+++ b/flink-benchmark/pom.xml
@@ -56,14 +56,21 @@ under the License.
       <artifactId>jmh-generator-annprocess</artifactId>
       <version>${jmh.version}</version>
       <scope>provided</scope>
+    </dependency>  
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime</artifactId>
+      <version>0.10-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-core</artifactId>
+      <artifactId>flink-java</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
-    </dependency>  
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ee55bd/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
 
b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
new file mode 100644
index 0000000..186c595
--- /dev/null
+++ 
b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.MutableHashTable;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import static org.junit.Assert.fail;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class MutableHashTablePerformanceBenchmark {
+       
+       private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+       
+       private MemoryManager memManager;
+       private IOManager ioManager;
+       
+       private TypeSerializer<StringPair> pairBuildSideAccesssor;
+       private TypeSerializer<StringPair> pairProbeSideAccesssor;
+       private TypeComparator<StringPair> pairBuildSideComparator;
+       private TypeComparator<StringPair> pairProbeSideComparator;
+       private TypePairComparator<StringPair, StringPair> pairComparator;
+       
+       private static final String COMMENT = "this comments should contains a 
96 byte data, 100 plus another integer value and seperator char.";
+       
+       @Setup
+       public void setup() {
+               this.pairBuildSideAccesssor = new StringPairSerializer();
+               this.pairProbeSideAccesssor = new StringPairSerializer();
+               this.pairBuildSideComparator = new StringPairComparator();
+               this.pairProbeSideComparator = new StringPairComparator();
+               this.pairComparator = new StringPairPairComparator();
+               
+               this.memManager = new MemoryManager(64 * 1024 * 1024, 1);
+               this.ioManager = new IOManagerAsync();
+       }
+       
+       @TearDown
+       public void tearDown() {
+               // shut down I/O manager and Memory Manager and verify the 
correct shutdown
+               this.ioManager.shutdown();
+               if (!this.ioManager.isProperlyShutDown()) {
+                       fail("I/O manager was not property shut down.");
+               }
+               if (!this.memManager.verifyEmpty()) {
+                       fail("Not all memory was properly released to the 
memory manager --> Memory Leak.");
+               }
+       }
+       
+       @Benchmark
+       public void compareMutableHashTableWithBloomFilter1() throws 
IOException {
+               // ----------------------------------------------90% filtered 
during probe spill phase-----------------------------------------
+               // create a build input with 1000000 records with key spread 
between [0 -- 10000000] with step of 10 for nearby records.
+               int buildSize = 1000000;
+               int buildStep = 10;
+               int buildScope = buildStep * buildSize;
+               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
+               int probeSize = 5000000;
+               int probeStep = 1;
+               int probeScope = buildSize;
+               
+               int expectedResult = 500000;
+               
+               this.hybridHashJoin(buildSize, buildStep, buildScope, 
probeSize, probeStep, probeScope, expectedResult, true);
+               
+               System.out.println("HybridHashJoin1:");
+               System.out.println("Build input size: " + 100 * buildSize);
+               System.out.println("Probe input size: " + 100 * probeSize);
+               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
+               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
+       }
+       
+       @Benchmark
+       public void compareMutableHashTableWithoutBloomFilter1() throws 
IOException {
+               // ----------------------------------------------90% filtered 
during probe spill phase-----------------------------------------
+               // create a build input with 1000000 records with key spread 
between [0 -- 10000000] with step of 10 for nearby records.
+               int buildSize = 1000000;
+               int buildStep = 10;
+               int buildScope = buildStep * buildSize;
+               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
+               int probeSize = 5000000;
+               int probeStep = 1;
+               int probeScope = buildSize;
+               
+               int expectedResult = 500000;
+               
+               this.hybridHashJoin(buildSize, buildStep, buildScope, 
probeSize, probeStep, probeScope, expectedResult, false);
+               
+               System.out.println("HybridHashJoin1:");
+               System.out.println("Build input size: " + 100 * buildSize);
+               System.out.println("Probe input size: " + 100 * probeSize);
+               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
+               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
+       }
+       
+       @Benchmark
+       public void compareMutableHashTableWithBloomFilter2() throws 
IOException {
+               // ----------------------------------------------80% filtered 
during probe spill phase-----------------------------------------
+               // create a build input with 1000000 records with key spread 
between [0 -- 5000000] with step of 5 for nearby records.
+               int buildSize = 1000000;
+               int buildStep = 5;
+               int buildScope = buildStep * buildSize;
+               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
+               int probeSize = 5000000;
+               int probeStep = 1;
+               int probeScope = buildSize;
+               
+               int expectedResult = 1000000;
+               
+               this.hybridHashJoin(buildSize, buildStep, buildScope, 
probeSize, probeStep, probeScope, expectedResult, true);
+               
+               System.out.println("HybridHashJoin2:");
+               System.out.println("Build input size: " + 100 * buildSize);
+               System.out.println("Probe input size: " + 100 * probeSize);
+               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
+               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
+       }
+       
+       @Benchmark
+       public void compareMutableHashTableWithoutBloomFilter2() throws 
IOException {
+               // ----------------------------------------------80% filtered 
during probe spill phase-----------------------------------------
+               // create a build input with 1000000 records with key spread 
between [0 -- 5000000] with step of 5 for nearby records.
+               int buildSize = 1000000;
+               int buildStep = 5;
+               int buildScope = buildStep * buildSize;
+               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
+               int probeSize = 5000000;
+               int probeStep = 1;
+               int probeScope = buildSize;
+               
+               int expectedResult = 1000000;
+               
+               this.hybridHashJoin(buildSize, buildStep, buildScope, 
probeSize, probeStep, probeScope, expectedResult, false);
+               
+               System.out.println("HybridHashJoin2:");
+               System.out.println("Build input size: " + 100 * buildSize);
+               System.out.println("Probe input size: " + 100 * probeSize);
+               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
+               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
+       }
+       
+       @Benchmark
+       public void compareMutableHashTableWithBloomFilter3() throws 
IOException {
+               // ----------------------------------------------50% filtered 
during probe spill phase-------------------------------------------------
+               // create a build input with 1000000 records with key spread 
between [0 -- 2000000] with step of 2 for nearby records.
+               int buildSize = 1000000;
+               int buildStep = 2;
+               int buildScope = buildStep * buildSize;
+               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
+               int probeSize = 5000000;
+               int probeStep = 1;
+               int probeScope = buildSize;
+               
+               int expectedResult = 2500000;
+               
+               this.hybridHashJoin(buildSize, buildStep, buildScope, 
probeSize, probeStep, probeScope, expectedResult, true);
+               
+               System.out.println("HybridHashJoin3:");
+               System.out.println("Build input size: " + 100 * buildSize);
+               System.out.println("Probe input size: " + 100 * probeSize);
+               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
+               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
+       }
+       
+       @Benchmark
+       public void compareMutableHashTableWithoutBloomFilter3() throws 
IOException {
+               // ----------------------------------------------50% filtered 
during probe spill phase-------------------------------------------------
+               // create a build input with 1000000 records with key spread 
between [0 -- 2000000] with step of 2 for nearby records.
+               int buildSize = 1000000;
+               int buildStep = 2;
+               int buildScope = buildStep * buildSize;
+               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
+               int probeSize = 5000000;
+               int probeStep = 1;
+               int probeScope = buildSize;
+               
+               int expectedResult = 2500000;
+               
+               this.hybridHashJoin(buildSize, buildStep, buildScope, 
probeSize, probeStep, probeScope, expectedResult, false);
+               
+               System.out.println("HybridHashJoin3:");
+               System.out.println("Build input size: " + 100 * buildSize);
+               System.out.println("Probe input size: " + 100 * probeSize);
+               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
+               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
+       }
+       
+       @Benchmark
+       public void compareMutableHashTableWithBloomFilter4() throws 
IOException {
+               // ----------------------------------------------0% filtered 
during probe spill phase-----------------------------------------
+               // create a build input with 1000000 records with key spread 
between [0 -- 1000000] with step of 1 for nearby records.
+               int buildSize = 1000000;
+               int buildStep = 1;
+               int buildScope = buildStep * buildSize;
+               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
+               int probeSize = 5000000;
+               int probeStep = 1;
+               int probeScope = buildSize;
+               
+               int expectedResult = probeSize / buildStep;
+               
+               this.hybridHashJoin(buildSize, buildStep, buildScope, 
probeSize, probeStep, probeScope, expectedResult, true);
+               
+               System.out.println("HybridHashJoin4:");
+               System.out.println("Build input size: " + 100 * buildSize);
+               System.out.println("Probe input size: " + 100 * probeSize);
+               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
+               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
+       }
+       
+       @Benchmark
+       public void compareMutableHashTableWithoutBloomFilter4() throws 
IOException {
+               // ----------------------------------------------0% filtered 
during probe spill phase-----------------------------------------
+               // create a build input with 1000000 records with key spread 
between [0 -- 1000000] with step of 1 for nearby records.
+               int buildSize = 1000000;
+               int buildStep = 1;
+               int buildScope = buildStep * buildSize;
+               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
+               int probeSize = 5000000;
+               int probeStep = 1;
+               int probeScope = buildSize;
+               
+               int expectedResult = probeSize / buildStep;
+               
+               this.hybridHashJoin(buildSize, buildStep, buildScope, 
probeSize, probeStep, probeScope, expectedResult, false);
+               
+               System.out.println("HybridHashJoin4:");
+               System.out.println("Build input size: " + 100 * buildSize);
+               System.out.println("Probe input size: " + 100 * probeSize);
+               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
+               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
+       }
+       
+       private long hybridHashJoin(int buildSize, int buildStep, int 
buildScope, int probeSize,
+               int probeStep, int probeScope, int expectedResultSize, boolean 
enableBloomFilter) throws IOException {
+               
+               InputIterator buildIterator = new InputIterator(buildSize, 
buildStep, buildScope);
+               InputIterator probeIterator = new InputIterator(probeSize, 
probeStep, probeScope);
+               
+               // allocate the memory for the HashTable
+               List<MemorySegment> memSegments;
+               try {
+                       // 33 is minimum number of pages required to perform 
hash join this inputs
+                       memSegments = this.memManager.allocatePages(MEM_OWNER, 
(int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
+               } catch (MemoryAllocationException maex) {
+                       fail("Memory for the Join could not be provided.");
+                       return -1;
+               }
+               
+               // 
----------------------------------------------------------------------------------------
+               
+               final MutableHashTable<StringPair, StringPair> join = new 
MutableHashTable<StringPair, StringPair>(
+                       this.pairBuildSideAccesssor, 
this.pairProbeSideAccesssor,
+                       this.pairBuildSideComparator, 
this.pairProbeSideComparator, this.pairComparator,
+                       memSegments, ioManager, enableBloomFilter);
+               join.open(buildIterator, probeIterator);
+               
+               final StringPair recordReuse = new StringPair();
+               int numRecordsInJoinResult = 0;
+               
+               while (join.nextRecord()) {
+                       MutableHashTable.HashBucketIterator<StringPair, 
StringPair> buildSide = join.getBuildSideIterator();
+                       while (buildSide.next(recordReuse) != null) {
+                               numRecordsInJoinResult++;
+                       }
+               }
+               Assert.assertEquals("Wrong number of records in join result.", 
expectedResultSize, numRecordsInJoinResult);
+               
+               join.close();
+               // 
----------------------------------------------------------------------------------------
+               
+               this.memManager.release(join.getFreedMemory());
+               return 1;
+       }
+       
+       
+       static class InputIterator implements MutableObjectIterator<StringPair> 
{
+               
+               private int numLeft;
+               private int distance;
+               private int scope;
+               
+               public InputIterator(int size, int distance, int scope) {
+                       this.numLeft = size;
+                       this.distance = distance;
+                       this.scope = scope;
+               }
+               
+               @Override
+               public StringPair next(StringPair reuse) throws IOException {
+                       if (this.numLeft > 0) {
+                               numLeft--;
+                               int currentKey = (numLeft * distance) % scope;
+                               reuse.setKey(Integer.toString(currentKey));
+                               reuse.setValue(COMMENT);
+                               return reuse;
+                       } else {
+                               return null;
+                       }
+               }
+               
+               @Override
+               public StringPair next() throws IOException {
+                       return next(new StringPair());
+               }
+       }
+       
+       public static void main(String[] args) throws Exception {
+               Options opt = new OptionsBuilder()
+                               
.include(MutableHashTablePerformanceBenchmark.class.getSimpleName())
+                               .warmupIterations(2)
+                               .measurementIterations(2)
+                               .forks(1)
+                               .build();
+               new Runner(opt).run();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ee55bd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
deleted file mode 100644
index 70c9427..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.types.StringPair;
-import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
-import 
org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
-import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
-import org.apache.flink.util.MutableObjectIterator;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public class MutableHashTablePerformanceBenchmark {
-       
-       private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-       
-       private MemoryManager memManager;
-       private IOManager ioManager;
-       
-       private TypeSerializer<StringPair> pairBuildSideAccesssor;
-       private TypeSerializer<StringPair> pairProbeSideAccesssor;
-       private TypeComparator<StringPair> pairBuildSideComparator;
-       private TypeComparator<StringPair> pairProbeSideComparator;
-       private TypePairComparator<StringPair, StringPair> pairComparator;
-       
-       private static final String COMMENT = "this comments should contains a 
96 byte data, 100 plus another integer value and seperator char.";
-       
-       
-       @Before
-       public void setup() {
-               this.pairBuildSideAccesssor = new StringPairSerializer();
-               this.pairProbeSideAccesssor = new StringPairSerializer();
-               this.pairBuildSideComparator = new StringPairComparator();
-               this.pairProbeSideComparator = new StringPairComparator();
-               this.pairComparator = new StringPairPairComparator();
-               
-               this.memManager = new MemoryManager(64 * 1024 * 1024, 1);
-               this.ioManager = new IOManagerAsync();
-       }
-       
-       @After
-       public void tearDown() {
-               // shut down I/O manager and Memory Manager and verify the 
correct shutdown
-               this.ioManager.shutdown();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       fail("I/O manager was not property shut down.");
-               }
-               if (!this.memManager.verifyEmpty()) {
-                       fail("Not all memory was properly released to the 
memory manager --> Memory Leak.");
-               }
-       }
-       
-       @Test
-       public void compareMutableHashTablePerformance1() throws IOException {
-               // ----------------------------------------------90% filtered 
during probe spill phase-----------------------------------------
-               // create a build input with 1000000 records with key spread 
between [0 -- 10000000] with step of 10 for nearby records.
-               int buildSize = 1000000;
-               int buildStep = 10;
-               int buildScope = buildStep * buildSize;
-               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
-               int probeSize = 5000000;
-               int probeStep = 1;
-               int probeScope = buildSize;
-               
-               int expectedResult = 500000;
-               
-               long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, 
buildScope, probeSize, probeStep, probeScope, expectedResult, true);
-               long withoutBloomFilterCost = hybridHashJoin(buildSize, 
buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-               
-               System.out.println("HybridHashJoin2:");
-               System.out.println("Build input size: " + 100 * buildSize);
-               System.out.println("Probe input size: " + 100 * probeSize);
-               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
-               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
-               System.out.println(String.format("Cost: without bloom 
filter(%d), with bloom filter(%d)", withoutBloomFilterCost, 
withBloomFilterCost));
-       }
-       
-       @Test
-       public void compareMutableHashTablePerformance2() throws IOException {
-               // ----------------------------------------------80% filtered 
during probe spill phase-----------------------------------------
-               // create a build input with 1000000 records with key spread 
between [0 -- 5000000] with step of 5 for nearby records.
-               int buildSize = 1000000;
-               int buildStep = 5;
-               int buildScope = buildStep * buildSize;
-               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
-               int probeSize = 5000000;
-               int probeStep = 1;
-               int probeScope = buildSize;
-               
-               int expectedResult = 1000000;
-               
-               long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, 
buildScope, probeSize, probeStep, probeScope, expectedResult, true);
-               long withoutBloomFilterCost = hybridHashJoin(buildSize, 
buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-               
-               System.out.println("HybridHashJoin3:");
-               System.out.println("Build input size: " + 100 * buildSize);
-               System.out.println("Probe input size: " + 100 * probeSize);
-               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
-               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
-               System.out.println(String.format("Cost: without bloom 
filter(%d), with bloom filter(%d)", withoutBloomFilterCost, 
withBloomFilterCost));
-       }
-       
-       @Test
-       public void compareMutableHashTablePerformance3() throws IOException {
-               // ----------------------------------------------50% filtered 
during probe spill phase-------------------------------------------------
-               // create a build input with 1000000 records with key spread 
between [0 -- 2000000] with step of 2 for nearby records.
-               int buildSize = 1000000;
-               int buildStep = 2;
-               int buildScope = buildStep * buildSize;
-               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
-               int probeSize = 5000000;
-               int probeStep = 1;
-               int probeScope = buildSize;
-               
-               int expectedResult = 2500000;
-               
-               long withoutBloomFilterCost = hybridHashJoin(buildSize, 
buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-               long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, 
buildScope, probeSize, probeStep, probeScope, expectedResult, true);
-               
-               System.out.println("HybridHashJoin4:");
-               System.out.println("Build input size: " + 100 * buildSize);
-               System.out.println("Probe input size: " + 100 * probeSize);
-               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
-               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
-               System.out.println(String.format("Cost: without bloom 
filter(%d), with bloom filter(%d)", withoutBloomFilterCost, 
withBloomFilterCost));
-       }
-       
-       @Test
-       public void compareMutableHashTablePerformance4() throws IOException {
-               // ----------------------------------------------0% filtered 
during probe spill phase-----------------------------------------
-               // create a build input with 1000000 records with key spread 
between [0 -- 1000000] with step of 1 for nearby records.
-               int buildSize = 1000000;
-               int buildStep = 1;
-               int buildScope = buildStep * buildSize;
-               // create a probe input with 5000000 records with key spread 
between [0 -- 1000000] with distance of 1 for nearby records.
-               int probeSize = 5000000;
-               int probeStep = 1;
-               int probeScope = buildSize;
-               
-               int expectedResult = probeSize / buildStep;
-               
-               long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, 
buildScope, probeSize, probeStep, probeScope, expectedResult, true);
-               long withoutBloomFilterCost = hybridHashJoin(buildSize, 
buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-               
-               System.out.println("HybridHashJoin5:");
-               System.out.println("Build input size: " + 100 * buildSize);
-               System.out.println("Probe input size: " + 100 * probeSize);
-               System.out.println("Available memory: " + 
this.memManager.getMemorySize());
-               System.out.println("Probe record be filtered before spill: " + 
(1 - (double)probeScope / buildScope) * 100 + "% percent.");
-               System.out.println(String.format("Cost: without bloom 
filter(%d), with bloom filter(%d)", withoutBloomFilterCost, 
withBloomFilterCost));
-       }
-       
-       private long hybridHashJoin(int buildSize, int buildStep, int 
buildScope, int probeSize,
-               int probeStep, int probeScope, int expectedResultSize, boolean 
enableBloomFilter) throws IOException {
-               
-               InputIterator buildIterator = new InputIterator(buildSize, 
buildStep, buildScope);
-               InputIterator probeIterator = new InputIterator(probeSize, 
probeStep, probeScope);
-               
-               // allocate the memory for the HashTable
-               List<MemorySegment> memSegments;
-               try {
-                       // 33 is minimum number of pages required to perform 
hash join this inputs
-                       memSegments = this.memManager.allocatePages(MEM_OWNER, 
(int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
-               } catch (MemoryAllocationException maex) {
-                       fail("Memory for the Join could not be provided.");
-                       return -1;
-               }
-               
-               // 
----------------------------------------------------------------------------------------
-               
-               long start = System.currentTimeMillis();
-               final MutableHashTable<StringPair, StringPair> join = new 
MutableHashTable<StringPair, StringPair>(
-                       this.pairBuildSideAccesssor, 
this.pairProbeSideAccesssor,
-                       this.pairBuildSideComparator, 
this.pairProbeSideComparator, this.pairComparator,
-                       memSegments, ioManager, enableBloomFilter);
-               join.open(buildIterator, probeIterator);
-               
-               final StringPair recordReuse = new StringPair();
-               int numRecordsInJoinResult = 0;
-               
-               while (join.nextRecord()) {
-                       MutableHashTable.HashBucketIterator<StringPair, 
StringPair> buildSide = join.getBuildSideIterator();
-                       while (buildSide.next(recordReuse) != null) {
-                               numRecordsInJoinResult++;
-                       }
-               }
-               Assert.assertEquals("Wrong number of records in join result.", 
expectedResultSize, numRecordsInJoinResult);
-               
-               join.close();
-               long cost = System.currentTimeMillis() - start;
-               // 
----------------------------------------------------------------------------------------
-               
-               this.memManager.release(join.getFreedMemory());
-               return cost;
-       }
-       
-       
-       static class InputIterator implements MutableObjectIterator<StringPair> 
{
-               
-               private int numLeft;
-               private int distance;
-               private int scope;
-               
-               public InputIterator(int size, int distance, int scope) {
-                       this.numLeft = size;
-                       this.distance = distance;
-                       this.scope = scope;
-               }
-               
-               @Override
-               public StringPair next(StringPair reuse) throws IOException {
-                       if (this.numLeft > 0) {
-                               numLeft--;
-                               int currentKey = (numLeft * distance) % scope;
-                               reuse.setKey(Integer.toString(currentKey));
-                               reuse.setValue(COMMENT);
-                               return reuse;
-                       } else {
-                               return null;
-                       }
-               }
-               
-               @Override
-               public StringPair next() throws IOException {
-                       return next(new StringPair());
-               }
-       }
-}

Reply via email to