[NO ISSUE][RT] Replace HybridHashJoin with OptimizedHybridHashJoin

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
The HybridHashJoinOperator is an old implenetation which haven't been
used in the runtime, and it lacks necessary documentation and memory
management. The OptimizedHybridHashJoinOperatorDescriptor serves the
same purpose. We should use this instead and avoid maintaining the old
one.

Change-Id: I6ed612cc233af1b78d453c7b711077b82e721e82
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3023
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0dec33a9
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0dec33a9
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0dec33a9

Branch: refs/heads/master
Commit: 0dec33a9644d4b45ae809c3c43eb304b3ae05850
Parents: 7d75792
Author: Xikui Wang <xkk...@gmail.com>
Authored: Thu Nov 15 13:32:01 2018 -0800
Committer: Xikui Wang <xkk...@gmail.com>
Committed: Thu Nov 15 14:59:11 2018 -0800

----------------------------------------------------------------------
 .../physical/HybridHashJoinPOperator.java       |  44 +-
 .../join/HybridHashJoinOperatorDescriptor.java  | 577 -------------------
 ...TPCHCustomerOptimizedHybridHashJoinTest.java | 245 --------
 .../TPCHCustomerOrderHashJoinTest.java          | 545 ++++++++----------
 4 files changed, 247 insertions(+), 1164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0dec33a9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 45ec44b..091cc44 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -45,7 +45,6 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
@@ -55,7 +54,6 @@ import 
org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -65,7 +63,6 @@ public class HybridHashJoinPOperator extends 
AbstractHashJoinPOperator {
     // The maximum number of in-memory frames that this hash join can use.
     private final int memSizeInFrames;
     private final int maxInputBuildSizeInFrames;
-    private final int aveRecordsPerFrame;
     private final double fudgeFactor;
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -76,7 +73,6 @@ public class HybridHashJoinPOperator extends 
AbstractHashJoinPOperator {
         super(kind, partitioningType, sideLeftOfEqualities, 
sideRightOfEqualities);
         this.memSizeInFrames = memSizeInFrames;
         this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
-        this.aveRecordsPerFrame = aveRecordsPerFrame;
         this.fudgeFactor = fudgeFactor;
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("HybridHashJoinPOperator constructed with: JoinKind=" 
+ kind + ", JoinPartitioningType="
@@ -117,8 +113,6 @@ public class HybridHashJoinPOperator extends 
AbstractHashJoinPOperator {
         int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, 
inputSchemas[0]);
         int[] keysRight = 
JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        IBinaryHashFunctionFactory[] hashFunFactories =
-                
JobGenHelper.variablesToBinaryHashFunctionFactories(keysLeftBranch, env, 
context);
         IBinaryHashFunctionFamily[] hashFunFamilies =
                 
JobGenHelper.variablesToBinaryHashFunctionFamilies(keysLeftBranch, env, 
context);
         IBinaryComparatorFactory[] comparatorFactories = new 
IBinaryComparatorFactory[keysLeft.length];
@@ -138,21 +132,9 @@ public class HybridHashJoinPOperator extends 
AbstractHashJoinPOperator {
                 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), 
propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc;
-        boolean optimizedHashJoin = true;
-        for (IBinaryHashFunctionFamily family : hashFunFamilies) {
-            if (family == null) {
-                optimizedHashJoin = false;
-                break;
-            }
-        }
 
-        if (optimizedHashJoin) {
-            opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas, 
keysLeft, keysRight, hashFunFamilies,
-                    comparatorFactories, predEvaluatorFactory, recDescriptor, 
spec);
-        } else {
-            opDesc = generateHashJoinRuntime(context, inputSchemas, keysLeft, 
keysRight, hashFunFactories,
-                    comparatorFactories, predEvaluatorFactory, recDescriptor, 
spec);
-        }
+        opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas, 
keysLeft, keysRight, hashFunFamilies,
+                comparatorFactories, predEvaluatorFactory, recDescriptor, 
spec);
         opDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
@@ -162,28 +144,6 @@ public class HybridHashJoinPOperator extends 
AbstractHashJoinPOperator {
         builder.contributeGraphEdge(src2, 0, op, 1);
     }
 
-    private IOperatorDescriptor generateHashJoinRuntime(JobGenContext context, 
IOperatorSchema[] inputSchemas,
-            int[] keysLeft, int[] keysRight, IBinaryHashFunctionFactory[] 
hashFunFactories,
-            IBinaryComparatorFactory[] comparatorFactories, 
IPredicateEvaluatorFactory predEvaluatorFactory,
-            RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) 
throws AlgebricksException {
-        switch (kind) {
-            case INNER:
-                return new HybridHashJoinOperatorDescriptor(spec, 
getMemSizeInFrames(), maxInputBuildSizeInFrames,
-                        aveRecordsPerFrame, getFudgeFactor(), keysLeft, 
keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, 
predEvaluatorFactory, false, null);
-            case LEFT_OUTER:
-                IMissingWriterFactory[] nonMatchWriterFactories = new 
IMissingWriterFactory[inputSchemas[1].getSize()];
-                for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-                    nonMatchWriterFactories[j] = 
context.getMissingWriterFactory();
-                }
-                return new HybridHashJoinOperatorDescriptor(spec, 
getMemSizeInFrames(), maxInputBuildSizeInFrames,
-                        aveRecordsPerFrame, getFudgeFactor(), keysLeft, 
keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, 
predEvaluatorFactory, true, nonMatchWriterFactories);
-            default:
-                throw new NotImplementedException();
-        }
-    }
-
     private IOperatorDescriptor generateOptimizedHashJoinRuntime(JobGenContext 
context, IOperatorSchema[] inputSchemas,
             int[] keysLeft, int[] keysRight, IBinaryHashFunctionFamily[] 
hashFunFamilies,
             IBinaryComparatorFactory[] comparatorFactories, 
IPredicateEvaluatorFactory predEvaluatorFactory,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0dec33a9/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
deleted file mode 100644
index 034b054..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ /dev/null
@@ -1,577 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import 
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import 
org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SimpleSerializableHashTable;
-import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
-
-public class HybridHashJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
-    private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
-    private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
-
-    private final int memsize;
-    private static final long serialVersionUID = 1L;
-    private final int inputsize0;
-    private final double factor;
-    private final int recordsPerFrame;
-    private final int[] keys0;
-    private final int[] keys1;
-    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IPredicateEvaluatorFactory predEvaluatorFactory;
-    private final boolean isLeftOuter;
-    private final IMissingWriterFactory[] nonMatchWriterFactories1;
-
-    /**
-     * @param spec
-     * @param memsize
-     *            in frames
-     * @param inputsize0
-     *            in frames
-     * @param recordsPerFrame
-     * @param factor
-     * @param keys0
-     * @param keys1
-     * @param hashFunctionFactories
-     * @param comparatorFactories
-     * @param recordDescriptor
-     */
-    public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int memsize, int inputsize0,
-            int recordsPerFrame, double factor, int[] keys0, int[] keys1,
-            IBinaryHashFunctionFactory[] hashFunctionFactories, 
IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory 
predEvalFactory, boolean isLeftOuter,
-            IMissingWriterFactory[] nullWriterFactories1) {
-        super(spec, 2, 1);
-        this.memsize = memsize;
-        this.inputsize0 = inputsize0;
-        this.factor = factor;
-        this.recordsPerFrame = recordsPerFrame;
-        this.keys0 = keys0;
-        this.keys1 = keys1;
-        this.hashFunctionFactories = hashFunctionFactories;
-        this.comparatorFactories = comparatorFactories;
-        this.predEvaluatorFactory = predEvalFactory;
-        this.isLeftOuter = isLeftOuter;
-        this.nonMatchWriterFactories1 = nullWriterFactories1;
-        outRecDescs[0] = recordDescriptor;
-    }
-
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        ActivityId p1Aid = new ActivityId(odId, 
BUILD_AND_PARTITION_ACTIVITY_ID);
-        ActivityId p2Aid = new ActivityId(odId, 
PARTITION_AND_JOIN_ACTIVITY_ID);
-        BuildAndPartitionActivityNode phase1 = new 
BuildAndPartitionActivityNode(p1Aid, p2Aid);
-        PartitionAndJoinActivityNode phase2 = new 
PartitionAndJoinActivityNode(p2Aid, p1Aid);
-
-        builder.addActivity(this, phase1);
-        builder.addSourceEdge(1, phase1, 0);
-
-        builder.addActivity(this, phase2);
-        builder.addSourceEdge(0, phase2, 0);
-
-        builder.addBlockingEdge(phase1, phase2);
-
-        builder.addTargetEdge(0, phase2, 0);
-    }
-
-    public static class BuildAndPartitionTaskState extends AbstractStateObject 
{
-        private RunFileWriter[] fWriters;
-        private InMemoryHashJoin joiner;
-        private int nPartitions;
-        private int memoryForHashtable;
-
-        public BuildAndPartitionTaskState() {
-        }
-
-        private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
-
-    }
-
-    private class BuildAndPartitionActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        private final ActivityId joinAid;
-
-        public BuildAndPartitionActivityNode(ActivityId id, ActivityId 
joinAid) {
-            super(id);
-            this.joinAid = joinAid;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int 
partition, final int nPartitions)
-                throws HyracksDataException {
-            final RecordDescriptor rd0 = 
recordDescProvider.getInputRecordDescriptor(joinAid, 0);
-            final RecordDescriptor rd1 = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final IBinaryComparator[] comparators = new 
IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = 
comparatorFactories[i].createBinaryComparator();
-            }
-            final IMissingWriter[] nullWriters1 =
-                    isLeftOuter ? new 
IMissingWriter[nonMatchWriterFactories1.length] : null;
-            if (isLeftOuter) {
-                for (int i = 0; i < nonMatchWriterFactories1.length; i++) {
-                    nullWriters1[i] = 
nonMatchWriterFactories1[i].createMissingWriter();
-                }
-            }
-            final IPredicateEvaluator predEvaluator =
-                    (predEvaluatorFactory == null ? null : 
predEvaluatorFactory.createPredicateEvaluator());
-
-            IOperatorNodePushable op = new 
AbstractUnaryInputSinkOperatorNodePushable() {
-                private BuildAndPartitionTaskState state = new 
BuildAndPartitionTaskState(
-                        ctx.getJobletContext().getJobId(), new 
TaskId(getActivityId(), partition));
-                private final FrameTupleAccessor accessorBuild = new 
FrameTupleAccessor(rd1);
-                private final ITuplePartitionComputer hpcBuild =
-                        new FieldHashPartitionComputerFactory(keys1, 
hashFunctionFactories).createPartitioner(ctx);
-                private final FrameTupleAppender appender = new 
FrameTupleAppender();
-                private final FrameTupleAppender ftappender = new 
FrameTupleAppender();
-                private IFrame[] bufferForPartitions;
-                private final IFrame inBuffer = new VSizeFrame(ctx);
-
-                @Override
-                public void close() throws HyracksDataException {
-                    if (state.memoryForHashtable != 0) {
-                        build(inBuffer.getBuffer());
-                    }
-
-                    for (int i = 0; i < state.nPartitions; i++) {
-                        ByteBuffer buf = bufferForPartitions[i].getBuffer();
-                        accessorBuild.reset(buf);
-                        if (accessorBuild.getTupleCount() > 0) {
-                            write(i, buf);
-                        }
-                        closeWriter(i);
-                    }
-
-                    ctx.setStateObject(state);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-
-                    if (state.memoryForHashtable != memsize - 2) {
-                        accessorBuild.reset(buffer);
-                        int tCount = accessorBuild.getTupleCount();
-                        for (int i = 0; i < tCount; ++i) {
-                            int entry;
-                            if (state.memoryForHashtable == 0) {
-                                entry = hpcBuild.partition(accessorBuild, i, 
state.nPartitions);
-                                boolean newBuffer = false;
-                                IFrame bufBi = bufferForPartitions[entry];
-                                while (true) {
-                                    appender.reset(bufBi, newBuffer);
-                                    if (appender.append(accessorBuild, i)) {
-                                        break;
-                                    } else {
-                                        write(entry, bufBi.getBuffer());
-                                        bufBi.reset();
-                                        newBuffer = true;
-                                    }
-                                }
-                            } else {
-                                entry = hpcBuild.partition(accessorBuild, i, 
(int) (inputsize0 * factor / nPartitions));
-                                if (entry < state.memoryForHashtable) {
-                                    while (true) {
-                                        if (!ftappender.append(accessorBuild, 
i)) {
-                                            build(inBuffer.getBuffer());
-
-                                            ftappender.reset(inBuffer, true);
-                                        } else {
-                                            break;
-                                        }
-                                    }
-                                } else {
-                                    entry %= state.nPartitions;
-                                    boolean newBuffer = false;
-                                    IFrame bufBi = bufferForPartitions[entry];
-                                    while (true) {
-                                        appender.reset(bufBi, newBuffer);
-                                        if (appender.append(accessorBuild, i)) 
{
-                                            break;
-                                        } else {
-                                            write(entry, bufBi.getBuffer());
-                                            bufBi.reset();
-                                            newBuffer = true;
-                                        }
-                                    }
-                                }
-                            }
-
-                        }
-                    } else {
-                        build(buffer);
-                    }
-
-                }
-
-                private void build(ByteBuffer inBuffer) throws 
HyracksDataException {
-                    ByteBuffer copyBuffer = 
ctx.allocateFrame(inBuffer.capacity());
-                    FrameUtils.copyAndFlip(inBuffer, copyBuffer);
-                    state.joiner.build(copyBuffer);
-                }
-
-                @Override
-                public void open() throws HyracksDataException {
-                    if (memsize > 1) {
-                        if (memsize > inputsize0) {
-                            state.nPartitions = 0;
-                        } else {
-                            state.nPartitions =
-                                    (int) (Math.ceil((inputsize0 * factor / 
nPartitions - memsize) / (memsize - 1)));
-                        }
-                        if (state.nPartitions <= 0) {
-                            // becomes in-memory HJ
-                            state.memoryForHashtable = memsize - 2;
-                            state.nPartitions = 0;
-                        } else {
-                            state.memoryForHashtable = memsize - 
state.nPartitions - 2;
-                            if (state.memoryForHashtable < 0) {
-                                state.memoryForHashtable = 0;
-                                state.nPartitions = (int) 
Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-                            }
-                        }
-                    } else {
-                        throw new HyracksDataException("not enough memory");
-                    }
-
-                    ITuplePartitionComputer hpc0 =
-                            new FieldHashPartitionComputerFactory(keys0, 
hashFunctionFactories).createPartitioner(ctx);
-                    ITuplePartitionComputer hpc1 =
-                            new FieldHashPartitionComputerFactory(keys1, 
hashFunctionFactories).createPartitioner(ctx);
-                    int tableSize = (int) (state.memoryForHashtable * 
recordsPerFrame * factor);
-                    ISerializableTable table = new 
SimpleSerializableHashTable(tableSize, ctx);
-                    state.joiner =
-                            new InMemoryHashJoin(ctx, new 
FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1),
-                                    rd1, hpc1, new 
FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                    nullWriters1, table, predEvaluator, null);
-                    bufferForPartitions = new IFrame[state.nPartitions];
-                    state.fWriters = new RunFileWriter[state.nPartitions];
-                    for (int i = 0; i < state.nPartitions; i++) {
-                        bufferForPartitions[i] = new VSizeFrame(ctx);
-                    }
-
-                    ftappender.reset(inBuffer, true);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                }
-
-                private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = state.fWriters[i];
-                    if (writer != null) {
-                        writer.close();
-                    }
-                }
-
-                private void write(int i, ByteBuffer head) throws 
HyracksDataException {
-                    RunFileWriter writer = state.fWriters[i];
-                    if (writer == null) {
-                        FileReference file = ctx.getJobletContext()
-                                
.createManagedWorkspaceFile(BuildAndPartitionActivityNode.class.getSimpleName());
-                        writer = new RunFileWriter(file, ctx.getIoManager());
-                        writer.open();
-                        state.fWriters[i] = writer;
-                    }
-                    writer.nextFrame(head);
-                }
-            };
-            return op;
-        }
-    }
-
-    private class PartitionAndJoinActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        private final ActivityId buildAid;
-
-        public PartitionAndJoinActivityNode(ActivityId id, ActivityId 
buildAid) {
-            super(id);
-            this.buildAid = buildAid;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int 
partition, final int nPartitions)
-                throws HyracksDataException {
-            final RecordDescriptor rd0 = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final RecordDescriptor rd1 = 
recordDescProvider.getInputRecordDescriptor(buildAid, 0);
-            final IBinaryComparator[] comparators = new 
IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = 
comparatorFactories[i].createBinaryComparator();
-            }
-            final IMissingWriter[] nullWriters1 =
-                    isLeftOuter ? new 
IMissingWriter[nonMatchWriterFactories1.length] : null;
-            if (isLeftOuter) {
-                for (int i = 0; i < nonMatchWriterFactories1.length; i++) {
-                    nullWriters1[i] = 
nonMatchWriterFactories1[i].createMissingWriter();
-                }
-            }
-            final IPredicateEvaluator predEvaluator =
-                    (predEvaluatorFactory == null ? null : 
predEvaluatorFactory.createPredicateEvaluator());
-
-            IOperatorNodePushable op = new 
AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-                private BuildAndPartitionTaskState state;
-                private final FrameTupleAccessor accessorProbe = new 
FrameTupleAccessor(rd0);
-                private final ITuplePartitionComputerFactory hpcf0 =
-                        new FieldHashPartitionComputerFactory(keys0, 
hashFunctionFactories);
-                private final ITuplePartitionComputerFactory hpcf1 =
-                        new FieldHashPartitionComputerFactory(keys1, 
hashFunctionFactories);
-                private final ITuplePartitionComputer hpcProbe = 
hpcf0.createPartitioner(ctx);
-
-                private final FrameTupleAppender appender = new 
FrameTupleAppender();
-                private final FrameTupleAppender ftap = new 
FrameTupleAppender();
-                private final IFrame inBuffer = new VSizeFrame(ctx);
-                private final IFrame outBuffer = new VSizeFrame(ctx);
-                private RunFileWriter[] buildWriters;
-                private RunFileWriter[] probeWriters;
-                private IFrame[] bufferForPartitions;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    writer.open();
-                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
-                            new TaskId(new ActivityId(getOperatorId(), 
BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-                    buildWriters = state.fWriters;
-                    probeWriters = new RunFileWriter[state.nPartitions];
-                    bufferForPartitions = new IFrame[state.nPartitions];
-                    for (int i = 0; i < state.nPartitions; i++) {
-                        bufferForPartitions[i] = new VSizeFrame(ctx);
-                    }
-                    appender.reset(outBuffer, true);
-                    ftap.reset(inBuffer, true);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                    if (state.memoryForHashtable != memsize - 2) {
-                        accessorProbe.reset(buffer);
-                        int tupleCount0 = accessorProbe.getTupleCount();
-                        for (int i = 0; i < tupleCount0; ++i) {
-
-                            int entry;
-                            if (state.memoryForHashtable == 0) {
-                                entry = hpcProbe.partition(accessorProbe, i, 
state.nPartitions);
-                                boolean newBuffer = false;
-                                IFrame outbuf = bufferForPartitions[entry];
-                                while (true) {
-                                    appender.reset(outbuf, newBuffer);
-                                    if (appender.append(accessorProbe, i)) {
-                                        break;
-                                    } else {
-                                        write(entry, outbuf.getBuffer());
-                                        outbuf.reset();
-                                        newBuffer = true;
-                                    }
-                                }
-                            } else {
-                                entry = hpcProbe.partition(accessorProbe, i, 
(int) (inputsize0 * factor / nPartitions));
-                                if (entry < state.memoryForHashtable) {
-                                    while (true) {
-                                        if (!ftap.append(accessorProbe, i)) {
-                                            
state.joiner.join(inBuffer.getBuffer(), writer);
-                                            ftap.reset(inBuffer, true);
-                                        } else {
-                                            break;
-                                        }
-                                    }
-
-                                } else {
-                                    entry %= state.nPartitions;
-                                    boolean newBuffer = false;
-                                    IFrame outbuf = bufferForPartitions[entry];
-                                    while (true) {
-                                        appender.reset(outbuf, newBuffer);
-                                        if (appender.append(accessorProbe, i)) 
{
-                                            break;
-                                        } else {
-                                            write(entry, outbuf.getBuffer());
-                                            outbuf.reset();
-                                            newBuffer = true;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    } else {
-                        state.joiner.join(buffer, writer);
-                    }
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    try {
-                        try {
-                            state.joiner.join(inBuffer.getBuffer(), writer);
-                            state.joiner.completeJoin(writer);
-                        } finally {
-                            state.joiner.releaseMemory();
-                        }
-                        ITuplePartitionComputer hpcRep0 =
-                                new 
RepartitionComputerFactory(state.nPartitions, hpcf0).createPartitioner(ctx);
-                        ITuplePartitionComputer hpcRep1 =
-                                new 
RepartitionComputerFactory(state.nPartitions, hpcf1).createPartitioner(ctx);
-                        if (state.memoryForHashtable != memsize - 2) {
-                            for (int i = 0; i < state.nPartitions; i++) {
-                                ByteBuffer buf = 
bufferForPartitions[i].getBuffer();
-                                accessorProbe.reset(buf);
-                                if (accessorProbe.getTupleCount() > 0) {
-                                    write(i, buf);
-                                }
-                                closeWriter(i);
-                            }
-
-                            inBuffer.reset();
-                            int tableSize = -1;
-                            if (state.memoryForHashtable == 0) {
-                                tableSize = (int) (state.nPartitions * 
recordsPerFrame * factor);
-                            } else {
-                                tableSize = (int) (memsize * recordsPerFrame * 
factor);
-                            }
-                            ISerializableTable table = new 
SimpleSerializableHashTable(tableSize, ctx);
-                            for (int partitionid = 0; partitionid < 
state.nPartitions; partitionid++) {
-                                RunFileWriter buildWriter = 
buildWriters[partitionid];
-                                RunFileWriter probeWriter = 
probeWriters[partitionid];
-                                if ((buildWriter == null && !isLeftOuter) || 
probeWriter == null) {
-                                    continue;
-                                }
-                                table.reset();
-                                InMemoryHashJoin joiner = new 
InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0),
-                                        hpcRep0, new FrameTupleAccessor(rd1), 
rd1, hpcRep1,
-                                        new FrameTuplePairComparator(keys0, 
keys1, comparators), isLeftOuter,
-                                        nullWriters1, table, predEvaluator, 
null);
-
-                                if (buildWriter != null) {
-                                    RunFileReader buildReader = 
buildWriter.createDeleteOnCloseReader();
-                                    try {
-                                        buildReader.open();
-                                        while 
(buildReader.nextFrame(inBuffer)) {
-                                            ByteBuffer copyBuffer = 
ctx.allocateFrame(inBuffer.getFrameSize());
-                                            
FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
-                                            joiner.build(copyBuffer);
-                                            inBuffer.reset();
-                                        }
-                                    } finally {
-                                        buildReader.close();
-                                    }
-                                }
-
-                                // probe
-                                RunFileReader probeReader = 
probeWriter.createDeleteOnCloseReader();
-                                try {
-                                    probeReader.open();
-                                    try {
-                                        while 
(probeReader.nextFrame(inBuffer)) {
-                                            joiner.join(inBuffer.getBuffer(), 
writer);
-                                            inBuffer.reset();
-                                        }
-                                        joiner.completeJoin(writer);
-                                    } finally {
-                                        joiner.releaseMemory();
-                                    }
-                                } finally {
-                                    probeReader.close();
-                                }
-                            }
-                        }
-                    } finally {
-                        writer.close();
-                    }
-                }
-
-                private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = probeWriters[i];
-                    if (writer != null) {
-                        writer.close();
-                    }
-                }
-
-                private void write(int i, ByteBuffer head) throws 
HyracksDataException {
-                    RunFileWriter writer = probeWriters[i];
-                    if (writer == null) {
-                        FileReference file =
-                                
ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName());
-                        writer = new RunFileWriter(file, ctx.getIoManager());
-                        writer.open();
-                        probeWriters[i] = writer;
-                    }
-                    writer.nextFrame(head);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    writer.fail();
-                }
-            };
-            return op;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0dec33a9/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
deleted file mode 100644
index 289f8ae..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.tests.integration;
-
-import java.io.File;
-import java.util.Arrays;
-
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.io.ManagedFileSplit;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import 
org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import 
org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
-import 
org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
-import org.junit.Test;
-
-public class TPCHCustomerOptimizedHybridHashJoinTest extends 
AbstractIntegrationTest {
-
-    private static boolean DEBUG = false;
-
-    static RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
-            new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-            new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-            new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-            new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-    static RecordDescriptor ordersDesc =
-            new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-    static RecordDescriptor custOrderJoinDesc =
-            new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-    static IValueParserFactory[] custValueParserFactories = new 
IValueParserFactory[custDesc.getFieldCount()];
-    static IValueParserFactory[] orderValueParserFactories = new 
IValueParserFactory[ordersDesc.getFieldCount()];
-
-    static {
-        Arrays.fill(custValueParserFactories, 
UTF8StringParserFactory.INSTANCE);
-        Arrays.fill(orderValueParserFactories, 
UTF8StringParserFactory.INSTANCE);
-    }
-
-    private IOperatorDescriptor getPrinter(JobSpecification spec, String path) 
{
-        IFileSplitProvider outputSplitProvider =
-                new ConstantFileSplitProvider(new FileSplit[] { new 
ManagedFileSplit(NC1_ID, path) });
-
-        return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, 
outputSplitProvider, "|")
-                : new NullSinkOperatorDescriptor(spec);
-    }
-
-    @Test
-    public void customerOrderCIDHybridHashJoin_Case1() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
-                "data" + File.separator + "tpch0.001" + File.separator + 
"customer4.tbl") };
-        IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders4.tbl") };
-
-        IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-        FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
-
-        OptimizedHybridHashJoinOperatorDescriptor join = new 
OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243,
-                1.2, new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFamily[] { 
UTF8StringBinaryHashFunctionFamily.INSTANCE },
-                new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc,
-                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 0, 1),
-                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 1, 0),
-                null);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, 
NC1_ID);
-
-        String path = getClass().getName() + File.separator + "case1";
-        IOperatorDescriptor printer = getPrinter(spec, path);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
-
-        IConnectorDescriptor custJoinConn = new 
OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new 
OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-        System.out.println("output to " + path);
-    }
-
-    @Test
-    public void customerOrderCIDHybridHashJoin_Case2() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
-                "data" + File.separator + "tpch0.001" + File.separator + 
"customer3.tbl") };
-        IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders4.tbl") };
-
-        IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-
-        FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
-
-        OptimizedHybridHashJoinOperatorDescriptor join = new 
OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122,
-                1.2, new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFamily[] { 
UTF8StringBinaryHashFunctionFamily.INSTANCE },
-                new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc,
-                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 0, 1),
-                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 1, 0),
-                null);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, 
NC1_ID);
-
-        String path = getClass().getName() + File.separator + "case2";
-        IOperatorDescriptor printer = getPrinter(spec, path);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
-
-        IConnectorDescriptor custJoinConn = new 
OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new 
OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-        System.out.println("output to " + path);
-    }
-
-    @Test
-    public void customerOrderCIDHybridHashJoin_Case3() throws Exception {
-
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
-                "data" + File.separator + "tpch0.001" + File.separator + 
"customer3.tbl") };
-        IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-
-        FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders1.tbl") };
-
-        IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-
-        FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
-
-        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
-
-        OptimizedHybridHashJoinOperatorDescriptor join = new 
OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122,
-                1.2, new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFamily[] { 
UTF8StringBinaryHashFunctionFamily.INSTANCE },
-                new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc,
-                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 0, 1),
-                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 1, 0),
-                null);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, 
NC1_ID);
-
-        String path = getClass().getName() + File.separator + "case3";
-        IOperatorDescriptor printer = getPrinter(spec, path);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
-
-        IConnectorDescriptor custJoinConn = new 
OneToOneConnectorDescriptor(spec);
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor joinPrinterConn = new 
OneToOneConnectorDescriptor(spec);
-        spec.connect(joinPrinterConn, join, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-        System.out.println("output to " + path);
-    }
-
-}

Reply via email to