Taewoo Kim has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1353
Change subject: ASTERIXDB-1736: Remove Grace Hash Join and Hybrid Hash Join
(not being used)
......................................................................
ASTERIXDB-1736: Remove Grace Hash Join and Hybrid Hash Join (not being used)
- Removed Grace Hash Join and Hybrid Hash Join that are not currently being
used
since we always use Optimized Hybrid Hash Join.
Change-Id: I16e9e4c73d7851f18a48c2715a6bc5c903b74eba
---
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
D
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
D
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
D
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
D
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
D
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
D
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
9 files changed, 5 insertions(+), 2,165 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/53/1353/1
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 248bc4f..fe82f68 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -46,7 +46,6 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
@@ -56,32 +55,28 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
import
org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
private final int memSizeInFrames;
private final int maxInputBuildSizeInFrames;
- private final int aveRecordsPerFrame;
private final double fudgeFactor;
private static final Logger LOGGER =
Logger.getLogger(HybridHashJoinPOperator.class.getName());
public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType
partitioningType,
List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable>
sideRightOfEqualities,
- int memSizeInFrames, int maxInputSizeInFrames, int
aveRecordsPerFrame, double fudgeFactor) {
+ int memSizeInFrames, int maxInputSizeInFrames, double fudgeFactor)
{
super(kind, partitioningType, sideLeftOfEqualities,
sideRightOfEqualities);
this.memSizeInFrames = memSizeInFrames;
this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
- this.aveRecordsPerFrame = aveRecordsPerFrame;
this.fudgeFactor = fudgeFactor;
LOGGER.fine("HybridHashJoinPOperator constructed with: JoinKind=" +
kind + ", JoinPartitioningType="
+ partitioningType + ", List<LogicalVariable>=" +
sideLeftOfEqualities + ", List<LogicalVariable>="
+ sideRightOfEqualities + ", int memSizeInFrames=" +
memSizeInFrames + ", int maxInputSize0InFrames="
- + maxInputSizeInFrames + ", int aveRecordsPerFrame=" +
aveRecordsPerFrame + ", double fudgeFactor="
- + fudgeFactor + ".");
+ + maxInputSizeInFrames + ", double fudgeFactor=" + fudgeFactor
+ ".");
}
@Override
@@ -114,8 +109,6 @@
int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch,
inputSchemas[0]);
int[] keysRight =
JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
IVariableTypeEnvironment env = context.getTypeEnvironment(op);
- IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper
- .variablesToBinaryHashFunctionFactories(keysLeftBranch, env,
context);
IBinaryHashFunctionFamily[] hashFunFamilies =
JobGenHelper.variablesToBinaryHashFunctionFamilies(keysLeftBranch,
env, context);
IBinaryComparatorFactory[] comparatorFactories = new
IBinaryComparatorFactory[keysLeft.length];
@@ -135,58 +128,20 @@
propagatedSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IOperatorDescriptor opDesc;
- boolean optimizedHashJoin = true;
for (IBinaryHashFunctionFamily family : hashFunFamilies) {
if (family == null) {
- optimizedHashJoin = false;
- break;
+ throw new AlgebricksException("A hash function should be
assigned for each key variable.");
}
}
- if (optimizedHashJoin) {
- opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas,
keysLeft, keysRight, hashFunFamilies,
- comparatorFactories, predEvaluatorFactory, recDescriptor,
spec);
- } else {
- opDesc = generateHashJoinRuntime(context, inputSchemas, keysLeft,
keysRight, hashFunFactories,
- comparatorFactories, predEvaluatorFactory, recDescriptor,
spec);
- }
+ opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas,
keysLeft, keysRight, hashFunFamilies,
+ comparatorFactories, predEvaluatorFactory, recDescriptor,
spec);
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
ILogicalOperator src1 = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src1, 0, op, 0);
ILogicalOperator src2 = op.getInputs().get(1).getValue();
builder.contributeGraphEdge(src2, 0, op, 1);
- }
-
- private IOperatorDescriptor generateHashJoinRuntime(JobGenContext context,
IOperatorSchema[] inputSchemas,
- int[] keysLeft, int[] keysRight, IBinaryHashFunctionFactory[]
hashFunFactories,
- IBinaryComparatorFactory[] comparatorFactories,
IPredicateEvaluatorFactory predEvaluatorFactory,
- RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec)
throws AlgebricksException {
- IOperatorDescriptor opDesc;
- try {
- switch (kind) {
- case INNER:
- opDesc = new HybridHashJoinOperatorDescriptor(spec,
getMemSizeInFrames(), maxInputBuildSizeInFrames,
- aveRecordsPerFrame, getFudgeFactor(), keysLeft,
keysRight, hashFunFactories,
- comparatorFactories, recDescriptor,
predEvaluatorFactory, false, null);
- break;
- case LEFT_OUTER:
- IMissingWriterFactory[] nonMatchWriterFactories = new
IMissingWriterFactory[inputSchemas[1]
- .getSize()];
- for (int j = 0; j < nonMatchWriterFactories.length; j++) {
- nonMatchWriterFactories[j] =
context.getMissingWriterFactory();
- }
- opDesc = new HybridHashJoinOperatorDescriptor(spec,
getMemSizeInFrames(), maxInputBuildSizeInFrames,
- aveRecordsPerFrame, getFudgeFactor(), keysLeft,
keysRight, hashFunFactories,
- comparatorFactories, recDescriptor,
predEvaluatorFactory, true, nonMatchWriterFactories);
- break;
- default:
- throw new NotImplementedException();
- }
- } catch (HyracksDataException e) {
- throw new AlgebricksException(e);
- }
- return opDesc;
}
private IOperatorDescriptor generateOptimizedHashJoinRuntime(JobGenContext
context, IOperatorSchema[] inputSchemas,
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 3332836..f4c6134 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -24,7 +24,6 @@
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -95,7 +94,6 @@
op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(),
partitioningType, sideLeft, sideRight,
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(),
context.getPhysicalOptimizationConfig().getMaxFramesForJoinLeftInput(),
-
context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(),
context.getPhysicalOptimizationConfig().getFudgeFactor()));
if (partitioningType == JoinPartitioningType.BROADCAST) {
hybridToInMemHashJoin(op, context);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
deleted file mode 100644
index 2f7b1c2..0000000
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-
-public class GraceHashJoinOperatorDescriptor extends
AbstractOperatorDescriptor {
- private static final int RPARTITION_ACTIVITY_ID = 0;
- private static final int SPARTITION_ACTIVITY_ID = 1;
- private static final int JOIN_ACTIVITY_ID = 2;
-
- private static final long serialVersionUID = 1L;
- private final int[] keys0;
- private final int[] keys1;
- private final int inputsize0;
- private final int recordsPerFrame;
- private final int memsize;
- private final double factor;
- private final IBinaryHashFunctionFactory[] hashFunctionFactories;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final IPredicateEvaluatorFactory predEvaluatorFactory;
- private final boolean isLeftOuter;
- private final IMissingWriterFactory[] nullWriterFactories1;
-
- public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
int memsize, int inputsize0,
- int recordsPerFrame, double factor, int[] keys0, int[] keys1,
- IBinaryHashFunctionFactory[] hashFunctionFactories,
IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory
predEvalFactory) {
- super(spec, 2, 1);
- this.memsize = memsize;
- this.inputsize0 = inputsize0;
- this.recordsPerFrame = recordsPerFrame;
- this.factor = factor;
- this.keys0 = keys0;
- this.keys1 = keys1;
- this.hashFunctionFactories = hashFunctionFactories;
- this.comparatorFactories = comparatorFactories;
- this.predEvaluatorFactory = predEvalFactory;
- this.isLeftOuter = false;
- this.nullWriterFactories1 = null;
- recordDescriptors[0] = recordDescriptor;
- }
-
- public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
int memsize, int inputsize0,
- int recordsPerFrame, double factor, int[] keys0, int[] keys1,
- IBinaryHashFunctionFactory[] hashFunctionFactories,
IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, boolean isLeftOuter,
IMissingWriterFactory[] nullWriterFactories1,
- IPredicateEvaluatorFactory predEvalFactory) {
- super(spec, 2, 1);
- this.memsize = memsize;
- this.inputsize0 = inputsize0;
- this.recordsPerFrame = recordsPerFrame;
- this.factor = factor;
- this.keys0 = keys0;
- this.keys1 = keys1;
- this.hashFunctionFactories = hashFunctionFactories;
- this.comparatorFactories = comparatorFactories;
- this.predEvaluatorFactory = predEvalFactory;
- this.isLeftOuter = isLeftOuter;
- this.nullWriterFactories1 = nullWriterFactories1;
- recordDescriptors[0] = recordDescriptor;
- }
-
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- ActivityId rpartAid = new ActivityId(odId, RPARTITION_ACTIVITY_ID);
- HashPartitionActivityNode rpart = new
HashPartitionActivityNode(rpartAid, keys0);
- ActivityId spartAid = new ActivityId(odId, SPARTITION_ACTIVITY_ID);
- HashPartitionActivityNode spart = new
HashPartitionActivityNode(spartAid, keys1);
- JoinActivityNode join = new JoinActivityNode(new ActivityId(odId,
JOIN_ACTIVITY_ID), rpartAid, spartAid);
-
- builder.addActivity(this, rpart);
- builder.addSourceEdge(0, rpart, 0);
-
- builder.addActivity(this, spart);
- builder.addSourceEdge(1, spart, 0);
-
- builder.addActivity(this, join);
- builder.addBlockingEdge(rpart, spart);
- builder.addBlockingEdge(spart, join);
-
- builder.addTargetEdge(0, join, 0);
- }
-
- public int getMemorySize() {
- return memsize;
- }
-
- private class HashPartitionActivityNode extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
- private int[] keys;
-
- public HashPartitionActivityNode(ActivityId id, int[] keys) {
- super(id);
- this.keys = keys;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition,
int nPartitions) {
- return new GraceHashJoinPartitionBuildOperatorNodePushable(ctx,
new TaskId(getActivityId(), partition),
- keys, hashFunctionFactories, comparatorFactories,
- (int) Math.ceil(Math.sqrt(inputsize0 * factor /
nPartitions)),
-
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
- }
- }
-
- private class JoinActivityNode extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- private final ActivityId rpartAid;
-
- private final ActivityId spartAid;
-
- public JoinActivityNode(ActivityId id, ActivityId rpartAid, ActivityId
spartAid) {
- super(id);
- this.rpartAid = rpartAid;
- this.spartAid = spartAid;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final
IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int
partition, final int nPartitions) {
- final RecordDescriptor rd0 =
recordDescProvider.getInputRecordDescriptor(rpartAid, 0);
- final RecordDescriptor rd1 =
recordDescProvider.getInputRecordDescriptor(spartAid, 0);
- int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor
/ nPartitions));
- final IPredicateEvaluator predEvaluator = predEvaluatorFactory ==
null ? null
- : predEvaluatorFactory.createPredicateEvaluator();
-
- return new GraceHashJoinOperatorNodePushable(ctx,
- new TaskId(new ActivityId(getOperatorId(),
RPARTITION_ACTIVITY_ID), partition),
- new TaskId(new ActivityId(getOperatorId(),
SPARTITION_ACTIVITY_ID), partition), recordsPerFrame,
- factor, keys0, keys1, hashFunctionFactories,
comparatorFactories, nullWriterFactories1, rd1, rd0,
- recordDescriptors[0], numPartitions, predEvaluator,
isLeftOuter);
- }
- }
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
deleted file mode 100644
index 2de8e6c..0000000
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import
org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
-
-class GraceHashJoinOperatorNodePushable extends
AbstractUnaryOutputSourceOperatorNodePushable {
- private final IHyracksTaskContext ctx;
- private final Object state0Id;
- private final Object state1Id;
- private final int[] keys0;
- private final int[] keys1;
- private final IBinaryHashFunctionFactory[] hashFunctionFactories;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final IMissingWriterFactory[] nonMatchWriterFactories;
- private final RecordDescriptor rd0;
- private final RecordDescriptor rd1;
- private final int recordsPerFrame;
- private final double factor;
- private final int numPartitions;
- private final boolean isLeftOuter;
- private final IPredicateEvaluator predEvaluator;
-
- GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object
state0Id, Object state1Id, int recordsPerFrame,
- double factor, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories,
- IBinaryComparatorFactory[] comparatorFactories,
IMissingWriterFactory[] nullWriterFactories,
- RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor
outRecordDescriptor, int numPartitions,
- IPredicateEvaluator predEval, boolean isLeftOuter) {
- this.ctx = ctx;
- this.state0Id = state0Id;
- this.state1Id = state1Id;
- this.keys0 = keys0;
- this.keys1 = keys1;
- this.hashFunctionFactories = hashFunctionFactories;
- this.comparatorFactories = comparatorFactories;
- this.nonMatchWriterFactories = nullWriterFactories;
- this.rd0 = rd0;
- this.rd1 = rd1;
- this.numPartitions = numPartitions;
- this.recordsPerFrame = recordsPerFrame;
- this.factor = factor;
- this.predEvaluator = predEval;
- this.isLeftOuter = isLeftOuter;
- }
-
- @Override
- public void initialize() throws HyracksDataException {
- GraceHashJoinPartitionState rState = (GraceHashJoinPartitionState)
ctx.getStateObject(state0Id);
- GraceHashJoinPartitionState sState = (GraceHashJoinPartitionState)
ctx.getStateObject(state1Id);
- RunFileWriter[] buildWriters = sState.getRunWriters();
- RunFileWriter[] probeWriters = rState.getRunWriters();
-
- IBinaryComparator[] comparators = new
IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- ITuplePartitionComputer hpcRep0 = new
RepartitionComputerFactory(numPartitions,
- new FieldHashPartitionComputerFactory(keys0,
hashFunctionFactories)).createPartitioner();
- ITuplePartitionComputer hpcRep1 = new
RepartitionComputerFactory(numPartitions,
- new FieldHashPartitionComputerFactory(keys1,
hashFunctionFactories)).createPartitioner();
-
- final IMissingWriter[] nullWriters1 = isLeftOuter ? new
IMissingWriter[nonMatchWriterFactories.length] : null;
- if (isLeftOuter) {
- for (int i = 0; i < nonMatchWriterFactories.length; i++) {
- nullWriters1[i] =
nonMatchWriterFactories[i].createMissingWriter();
- }
- }
- try {
- writer.open();// open for probe
- IFrame buffer = new VSizeFrame(ctx);
- // buffer
- int tableSize = (int) (numPartitions * recordsPerFrame * factor);
- ISerializableTable table = new SerializableHashTable(tableSize,
ctx);
-
- for (int partitionid = 0; partitionid < numPartitions;
partitionid++) {
- RunFileWriter buildWriter = buildWriters[partitionid];
- RunFileWriter probeWriter = probeWriters[partitionid];
- if ((buildWriter == null && !isLeftOuter) || probeWriter ==
null) {
- continue;
- }
- table.reset();
- InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
new FrameTupleAccessor(rd0), hpcRep0,
- new FrameTupleAccessor(rd1), hpcRep1, new
FrameTuplePairComparator(keys0, keys1, comparators),
- isLeftOuter, nullWriters1, table, predEvaluator);
-
- // build
- if (buildWriter != null) {
- RunFileReader buildReader =
buildWriter.createDeleteOnCloseReader();
- buildReader.open();
- while (buildReader.nextFrame(buffer)) {
- ByteBuffer copyBuffer =
ctx.allocateFrame(buffer.getFrameSize());
- FrameUtils.copyAndFlip(buffer.getBuffer(), copyBuffer);
- joiner.build(copyBuffer);
- buffer.reset();
- }
- buildReader.close();
- }
-
- // probe
- RunFileReader probeReader =
probeWriter.createDeleteOnCloseReader();
- probeReader.open();
- while (probeReader.nextFrame(buffer)) {
- joiner.join(buffer.getBuffer(), writer);
- buffer.reset();
- }
- probeReader.close();
- joiner.closeJoin(writer);
- }
- } catch (Throwable th) {
- writer.fail();
- throw new HyracksDataException(th);
- } finally {
- writer.close();
- }
- }
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
deleted file mode 100644
index 5a5543b..0000000
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-
-class GraceHashJoinPartitionBuildOperatorNodePushable extends
- AbstractUnaryInputSinkOperatorNodePushable {
- private final IHyracksTaskContext ctx;
- private final Object stateId;
- private final int numPartitions;
- private final IBinaryComparator[] comparators;
- private final FrameTupleAccessor accessor0;
- private final ITuplePartitionComputer hpc;
- private final FrameTupleAppender appender;
- private IFrame[] outbufs;
- private GraceHashJoinPartitionState state;
-
- GraceHashJoinPartitionBuildOperatorNodePushable(IHyracksTaskContext ctx,
Object stateId, int[] keys,
- IBinaryHashFunctionFactory[] hashFunctionFactories,
IBinaryComparatorFactory[] comparatorFactories,
- int numPartitions, RecordDescriptor inRecordDescriptor) {
- this.ctx = ctx;
- this.stateId = stateId;
- this.numPartitions = numPartitions;
- accessor0 = new FrameTupleAccessor(inRecordDescriptor);
- appender = new FrameTupleAppender();
- hpc = new FieldHashPartitionComputerFactory(keys,
hashFunctionFactories).createPartitioner();
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- for (int i = 0; i < numPartitions; i++) {
- ByteBuffer head = outbufs[i].getBuffer();
- accessor0.reset(head);
- if (accessor0.getTupleCount() > 0) {
- write(i, head);
- }
- closeWriter(i);
- }
-
- ctx.setStateObject(state);
- }
-
- private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = state.getRunWriters()[i];
- if (writer != null) {
- writer.close();
- }
- }
-
- private void write(int i, ByteBuffer head) throws HyracksDataException {
- RunFileWriter writer = state.getRunWriters()[i];
- if (writer == null) {
- FileReference file =
ctx.getJobletContext().createManagedWorkspaceFile(
- GraceHashJoinOperatorDescriptor.class.getSimpleName());
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- state.getRunWriters()[i] = writer;
- }
- writer.nextFrame(head);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
- for (int i = 0; i < tCount; ++i) {
-
- int entry = hpc.partition(accessor0, i, numPartitions);
- IFrame outbuf = outbufs[entry];
- appender.reset(outbuf, false);
- if (!appender.append(accessor0, i)) {
- // buffer is full, ie. we cannot fit the tuple
- // into the buffer -- write it to disk
- write(entry, outbuf.getBuffer());
- outbuf.reset();
- appender.reset(outbuf, true);
- if (!appender.append(accessor0, i)) {
- throw new HyracksDataException("Item too big to fit in
frame");
- }
- }
- }
- }
-
- @Override
- public void open() throws HyracksDataException {
- state = new
GraceHashJoinPartitionState(ctx.getJobletContext().getJobId(), stateId);
- outbufs = new IFrame[numPartitions];
- state.setRunWriters(new RunFileWriter[numPartitions]);
- for (int i = 0; i < numPartitions; i++) {
- outbufs[i] = new VSizeFrame(ctx);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
deleted file mode 100644
index a970a6c..0000000
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-
-public class GraceHashJoinPartitionState extends AbstractStateObject {
- private RunFileWriter[] fWriters;
-
- public GraceHashJoinPartitionState(JobId jobId, Object id) {
- super(jobId, id);
- }
-
- public RunFileWriter[] getRunWriters() {
- return fWriters;
- }
-
- public void setRunWriters(RunFileWriter[] fWriters) {
- this.fWriters = fWriters;
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
-
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
-
- }
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
deleted file mode 100644
index 4354367..0000000
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.join;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import
org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
-import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
-
-public class HybridHashJoinOperatorDescriptor extends
AbstractOperatorDescriptor {
- private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
- private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
-
- private final int memsize;
- private static final long serialVersionUID = 1L;
- private final int inputsize0;
- private final double factor;
- private final int recordsPerFrame;
- private final int[] keys0;
- private final int[] keys1;
- private final IBinaryHashFunctionFactory[] hashFunctionFactories;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final IPredicateEvaluatorFactory predEvaluatorFactory;
- private final boolean isLeftOuter;
- private final IMissingWriterFactory[] nonMatchWriterFactories1;
-
- /**
- * @param spec
- * @param memsize
- * in frames
- * @param inputsize0
- * in frames
- * @param recordsPerFrame
- * @param factor
- * @param keys0
- * @param keys1
- * @param hashFunctionFactories
- * @param comparatorFactories
- * @param recordDescriptor
- * @throws HyracksDataException
- */
- public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
int memsize, int inputsize0,
- int recordsPerFrame, double factor, int[] keys0, int[] keys1,
- IBinaryHashFunctionFactory[] hashFunctionFactories,
IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory
predEvalFactory, boolean isLeftOuter,
- IMissingWriterFactory[] nullWriterFactories1) throws
HyracksDataException {
- super(spec, 2, 1);
- this.memsize = memsize;
- this.inputsize0 = inputsize0;
- this.factor = factor;
- this.recordsPerFrame = recordsPerFrame;
- this.keys0 = keys0;
- this.keys1 = keys1;
- this.hashFunctionFactories = hashFunctionFactories;
- this.comparatorFactories = comparatorFactories;
- this.predEvaluatorFactory = predEvalFactory;
- this.isLeftOuter = isLeftOuter;
- this.nonMatchWriterFactories1 = nullWriterFactories1;
- recordDescriptors[0] = recordDescriptor;
- }
-
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- ActivityId p1Aid = new ActivityId(odId,
BUILD_AND_PARTITION_ACTIVITY_ID);
- ActivityId p2Aid = new ActivityId(odId,
PARTITION_AND_JOIN_ACTIVITY_ID);
- BuildAndPartitionActivityNode phase1 = new
BuildAndPartitionActivityNode(p1Aid, p2Aid);
- PartitionAndJoinActivityNode phase2 = new
PartitionAndJoinActivityNode(p2Aid, p1Aid);
-
- builder.addActivity(this, phase1);
- builder.addSourceEdge(1, phase1, 0);
-
- builder.addActivity(this, phase2);
- builder.addSourceEdge(0, phase2, 0);
-
- builder.addBlockingEdge(phase1, phase2);
-
- builder.addTargetEdge(0, phase2, 0);
- }
-
- public static class BuildAndPartitionTaskState extends AbstractStateObject
{
- private RunFileWriter[] fWriters;
- private InMemoryHashJoin joiner;
- private int nPartitions;
- private int memoryForHashtable;
-
- public BuildAndPartitionTaskState() {
- }
-
- private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
- super(jobId, taskId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
-
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
-
- }
-
- }
-
- private class BuildAndPartitionActivityNode extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- private final ActivityId joinAid;
-
- public BuildAndPartitionActivityNode(ActivityId id, ActivityId
joinAid) {
- super(id);
- this.joinAid = joinAid;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final
IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int
partition, final int nPartitions)
- throws HyracksDataException {
- final RecordDescriptor rd0 =
recordDescProvider.getInputRecordDescriptor(joinAid, 0);
- final RecordDescriptor rd1 =
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final IBinaryComparator[] comparators = new
IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] =
comparatorFactories[i].createBinaryComparator();
- }
- final IMissingWriter[] nullWriters1 = isLeftOuter ? new
IMissingWriter[nonMatchWriterFactories1.length]
- : null;
- if (isLeftOuter) {
- for (int i = 0; i < nonMatchWriterFactories1.length; i++) {
- nullWriters1[i] =
nonMatchWriterFactories1[i].createMissingWriter();
- }
- }
- final IPredicateEvaluator predEvaluator = (predEvaluatorFactory ==
null ? null
- : predEvaluatorFactory.createPredicateEvaluator());
-
- IOperatorNodePushable op = new
AbstractUnaryInputSinkOperatorNodePushable() {
- private BuildAndPartitionTaskState state = new
BuildAndPartitionTaskState(
- ctx.getJobletContext().getJobId(), new
TaskId(getActivityId(), partition));
- private final FrameTupleAccessor accessorBuild = new
FrameTupleAccessor(rd1);
- private final ITuplePartitionComputer hpcBuild = new
FieldHashPartitionComputerFactory(keys1,
- hashFunctionFactories).createPartitioner();
- private final FrameTupleAppender appender = new
FrameTupleAppender();
- private final FrameTupleAppender ftappender = new
FrameTupleAppender();
- private IFrame[] bufferForPartitions;
- private final IFrame inBuffer = new VSizeFrame(ctx);
-
- @Override
- public void close() throws HyracksDataException {
- if (state.memoryForHashtable != 0) {
- build(inBuffer.getBuffer());
- }
-
- for (int i = 0; i < state.nPartitions; i++) {
- ByteBuffer buf = bufferForPartitions[i].getBuffer();
- accessorBuild.reset(buf);
- if (accessorBuild.getTupleCount() > 0) {
- write(i, buf);
- }
- closeWriter(i);
- }
-
- ctx.setStateObject(state);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
-
- if (state.memoryForHashtable != memsize - 2) {
- accessorBuild.reset(buffer);
- int tCount = accessorBuild.getTupleCount();
- for (int i = 0; i < tCount; ++i) {
- int entry;
- if (state.memoryForHashtable == 0) {
- entry = hpcBuild.partition(accessorBuild, i,
state.nPartitions);
- boolean newBuffer = false;
- IFrame bufBi = bufferForPartitions[entry];
- while (true) {
- appender.reset(bufBi, newBuffer);
- if (appender.append(accessorBuild, i)) {
- break;
- } else {
- write(entry, bufBi.getBuffer());
- bufBi.reset();
- newBuffer = true;
- }
- }
- } else {
- entry = hpcBuild.partition(accessorBuild, i,
(int) (inputsize0 * factor / nPartitions));
- if (entry < state.memoryForHashtable) {
- while (true) {
- if (!ftappender.append(accessorBuild,
i)) {
- build(inBuffer.getBuffer());
-
- ftappender.reset(inBuffer, true);
- } else {
- break;
- }
- }
- } else {
- entry %= state.nPartitions;
- boolean newBuffer = false;
- IFrame bufBi = bufferForPartitions[entry];
- while (true) {
- appender.reset(bufBi, newBuffer);
- if (appender.append(accessorBuild, i))
{
- break;
- } else {
- write(entry, bufBi.getBuffer());
- bufBi.reset();
- newBuffer = true;
- }
- }
- }
- }
-
- }
- } else {
- build(buffer);
- }
-
- }
-
- private void build(ByteBuffer inBuffer) throws
HyracksDataException {
- ByteBuffer copyBuffer =
ctx.allocateFrame(inBuffer.capacity());
- FrameUtils.copyAndFlip(inBuffer, copyBuffer);
- state.joiner.build(copyBuffer);
- }
-
- @Override
- public void open() throws HyracksDataException {
- if (memsize > 1) {
- if (memsize > inputsize0) {
- state.nPartitions = 0;
- } else {
- state.nPartitions = (int) (Math
- .ceil((inputsize0 * factor / nPartitions -
memsize) / (memsize - 1)));
- }
- if (state.nPartitions <= 0) {
- // becomes in-memory HJ
- state.memoryForHashtable = memsize - 2;
- state.nPartitions = 0;
- } else {
- state.memoryForHashtable = memsize -
state.nPartitions - 2;
- if (state.memoryForHashtable < 0) {
- state.memoryForHashtable = 0;
- state.nPartitions = (int)
Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
- }
- }
- } else {
- throw new HyracksDataException("not enough memory");
- }
-
- ITuplePartitionComputer hpc0 = new
FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
- .createPartitioner();
- ITuplePartitionComputer hpc1 = new
FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
- .createPartitioner();
- int tableSize = (int) (state.memoryForHashtable *
recordsPerFrame * factor);
- ISerializableTable table = new
SerializableHashTable(tableSize, ctx);
- state.joiner = new InMemoryHashJoin(ctx, tableSize, new
FrameTupleAccessor(rd0), hpc0,
- new FrameTupleAccessor(rd1), hpc1, new
FrameTuplePairComparator(keys0, keys1, comparators),
- isLeftOuter, nullWriters1, table, predEvaluator);
- bufferForPartitions = new IFrame[state.nPartitions];
- state.fWriters = new RunFileWriter[state.nPartitions];
- for (int i = 0; i < state.nPartitions; i++) {
- bufferForPartitions[i] = new VSizeFrame(ctx);
- }
-
- ftappender.reset(inBuffer, true);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-
- private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = state.fWriters[i];
- if (writer != null) {
- writer.close();
- }
- }
-
- private void write(int i, ByteBuffer head) throws
HyracksDataException {
- RunFileWriter writer = state.fWriters[i];
- if (writer == null) {
- FileReference file = ctx.getJobletContext()
-
.createManagedWorkspaceFile(BuildAndPartitionActivityNode.class.getSimpleName());
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- state.fWriters[i] = writer;
- }
- writer.nextFrame(head);
- }
- };
- return op;
- }
- }
-
- private class PartitionAndJoinActivityNode extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- private final ActivityId buildAid;
-
- public PartitionAndJoinActivityNode(ActivityId id, ActivityId
buildAid) {
- super(id);
- this.buildAid = buildAid;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final
IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int
partition, final int nPartitions)
- throws HyracksDataException {
- final RecordDescriptor rd0 =
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final RecordDescriptor rd1 =
recordDescProvider.getInputRecordDescriptor(buildAid, 0);
- final IBinaryComparator[] comparators = new
IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] =
comparatorFactories[i].createBinaryComparator();
- }
- final IMissingWriter[] nullWriters1 = isLeftOuter ? new
IMissingWriter[nonMatchWriterFactories1.length]
- : null;
- if (isLeftOuter) {
- for (int i = 0; i < nonMatchWriterFactories1.length; i++) {
- nullWriters1[i] =
nonMatchWriterFactories1[i].createMissingWriter();
- }
- }
- final IPredicateEvaluator predEvaluator = (predEvaluatorFactory ==
null ? null
- : predEvaluatorFactory.createPredicateEvaluator());
-
- IOperatorNodePushable op = new
AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private BuildAndPartitionTaskState state;
- private final FrameTupleAccessor accessorProbe = new
FrameTupleAccessor(rd0);
- private final ITuplePartitionComputerFactory hpcf0 = new
FieldHashPartitionComputerFactory(keys0,
- hashFunctionFactories);
- private final ITuplePartitionComputerFactory hpcf1 = new
FieldHashPartitionComputerFactory(keys1,
- hashFunctionFactories);
- private final ITuplePartitionComputer hpcProbe =
hpcf0.createPartitioner();
-
- private final FrameTupleAppender appender = new
FrameTupleAppender();
- private final FrameTupleAppender ftap = new
FrameTupleAppender();
- private final IFrame inBuffer = new VSizeFrame(ctx);
- private final IFrame outBuffer = new VSizeFrame(ctx);
- private RunFileWriter[] buildWriters;
- private RunFileWriter[] probeWriters;
- private IFrame[] bufferForPartitions;
-
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- state = (BuildAndPartitionTaskState) ctx.getStateObject(
- new TaskId(new ActivityId(getOperatorId(),
BUILD_AND_PARTITION_ACTIVITY_ID), partition));
- buildWriters = state.fWriters;
- probeWriters = new RunFileWriter[state.nPartitions];
- bufferForPartitions = new IFrame[state.nPartitions];
- for (int i = 0; i < state.nPartitions; i++) {
- bufferForPartitions[i] = new VSizeFrame(ctx);
- }
- appender.reset(outBuffer, true);
- ftap.reset(inBuffer, true);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
- if (state.memoryForHashtable != memsize - 2) {
- accessorProbe.reset(buffer);
- int tupleCount0 = accessorProbe.getTupleCount();
- for (int i = 0; i < tupleCount0; ++i) {
-
- int entry;
- if (state.memoryForHashtable == 0) {
- entry = hpcProbe.partition(accessorProbe, i,
state.nPartitions);
- boolean newBuffer = false;
- IFrame outbuf = bufferForPartitions[entry];
- while (true) {
- appender.reset(outbuf, newBuffer);
- if (appender.append(accessorProbe, i)) {
- break;
- } else {
- write(entry, outbuf.getBuffer());
- outbuf.reset();
- newBuffer = true;
- }
- }
- } else {
- entry = hpcProbe.partition(accessorProbe, i,
(int) (inputsize0 * factor / nPartitions));
- if (entry < state.memoryForHashtable) {
- while (true) {
- if (!ftap.append(accessorProbe, i)) {
-
state.joiner.join(inBuffer.getBuffer(), writer);
- ftap.reset(inBuffer, true);
- } else {
- break;
- }
- }
-
- } else {
- entry %= state.nPartitions;
- boolean newBuffer = false;
- IFrame outbuf = bufferForPartitions[entry];
- while (true) {
- appender.reset(outbuf, newBuffer);
- if (appender.append(accessorProbe, i))
{
- break;
- } else {
- write(entry, outbuf.getBuffer());
- outbuf.reset();
- newBuffer = true;
- }
- }
- }
- }
- }
- } else {
- state.joiner.join(buffer, writer);
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- state.joiner.join(inBuffer.getBuffer(), writer);
- state.joiner.closeJoin(writer);
- ITuplePartitionComputer hpcRep0 = new
RepartitionComputerFactory(state.nPartitions, hpcf0)
- .createPartitioner();
- ITuplePartitionComputer hpcRep1 = new
RepartitionComputerFactory(state.nPartitions, hpcf1)
- .createPartitioner();
- if (state.memoryForHashtable != memsize - 2) {
- for (int i = 0; i < state.nPartitions; i++) {
- ByteBuffer buf =
bufferForPartitions[i].getBuffer();
- accessorProbe.reset(buf);
- if (accessorProbe.getTupleCount() > 0) {
- write(i, buf);
- }
- closeWriter(i);
- }
-
- inBuffer.reset();
- int tableSize = -1;
- if (state.memoryForHashtable == 0) {
- tableSize = (int) (state.nPartitions *
recordsPerFrame * factor);
- } else {
- tableSize = (int) (memsize * recordsPerFrame *
factor);
- }
- ISerializableTable table = new
SerializableHashTable(tableSize, ctx);
- for (int partitionid = 0; partitionid <
state.nPartitions; partitionid++) {
- RunFileWriter buildWriter =
buildWriters[partitionid];
- RunFileWriter probeWriter =
probeWriters[partitionid];
- if ((buildWriter == null && !isLeftOuter) ||
probeWriter == null) {
- continue;
- }
- table.reset();
- InMemoryHashJoin joiner = new
InMemoryHashJoin(ctx, tableSize,
- new FrameTupleAccessor(rd0), hpcRep0,
new FrameTupleAccessor(rd1), hpcRep1,
- new FrameTuplePairComparator(keys0,
keys1, comparators), isLeftOuter,
- nullWriters1, table, predEvaluator);
-
- if (buildWriter != null) {
- RunFileReader buildReader =
buildWriter.createDeleteOnCloseReader();
- buildReader.open();
- while (buildReader.nextFrame(inBuffer)) {
- ByteBuffer copyBuffer =
ctx.allocateFrame(inBuffer.getFrameSize());
-
FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
- joiner.build(copyBuffer);
- inBuffer.reset();
- }
- buildReader.close();
- }
-
- // probe
- RunFileReader probeReader =
probeWriter.createDeleteOnCloseReader();
- probeReader.open();
- while (probeReader.nextFrame(inBuffer)) {
- joiner.join(inBuffer.getBuffer(), writer);
- inBuffer.reset();
- }
- probeReader.close();
- joiner.closeJoin(writer);
- }
- }
- } finally {
- writer.close();
- }
- }
-
- private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = probeWriters[i];
- if (writer != null) {
- writer.close();
- }
- }
-
- private void write(int i, ByteBuffer head) throws
HyracksDataException {
- RunFileWriter writer = probeWriters[i];
- if (writer == null) {
- FileReference file = ctx
-
.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName());
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- probeWriters[i] = writer;
- }
- writer.nextFrame(head);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
- };
- return op;
- }
- }
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
deleted file mode 100644
index 43ef74d..0000000
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ /dev/null
@@ -1,1032 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.tests.integration;
-
-import java.io.File;
-
-import org.junit.Test;
-
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
-import
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import
org.apache.hyracks.dataflow.std.connectors.MToNBroadcastConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import org.apache.hyracks.tests.util.NoopMissingWriterFactory;
-import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
-
-public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
-
- /*
- * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
- * C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY
- * INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT
- * NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );
- * TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,
- * O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE
- * DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY
- * CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT
- * NULL, O_COMMENT VARCHAR(79) NOT NULL );
- */
-
- @Test
- public void customerOrderCIDJoin() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new
FileReference(new File(
- "data/tpch0.001/customer.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new
FileReference(new File(
- "data/tpch0.001/orders.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC1_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID);
-
- InMemoryHashJoinOperatorDescriptor join = new
InMemoryHashJoinOperatorDescriptor(
- spec,
- new int[] { 1 },
- new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 128, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
- IConnectorDescriptor custJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(custJoinConn, custScanner, 0, join, 1);
-
- IConnectorDescriptor joinPrinterConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDGraceJoin() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new
FileReference(new File(
- "data/tpch0.001/customer.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new
FileReference(new File(
- "data/tpch0.001/orders.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC1_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID);
-
- GraceHashJoinOperatorDescriptor join = new
GraceHashJoinOperatorDescriptor(
- spec,
- 4,
- 10,
- 200,
- 1.2,
- new int[] { 1 },
- new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
- IConnectorDescriptor custJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(custJoinConn, custScanner, 0, join, 1);
-
- IConnectorDescriptor joinPrinterConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDHybridHashJoin() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new
FileReference(new File(
- "data/tpch0.001/customer.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new
FileReference(new File(
- "data/tpch0.001/orders.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC1_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID);
-
- HybridHashJoinOperatorDescriptor join = new
HybridHashJoinOperatorDescriptor(
- spec,
- 5,
- 20,
- 200,
- 1.2,
- new int[] { 1 },
- new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null, false, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
- IConnectorDescriptor custJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(custJoinConn, custScanner, 0, join, 1);
-
- IConnectorDescriptor joinPrinterConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDInMemoryHashLeftOuterJoin() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new
FileReference(new File(
- "data/tpch0.001/customer.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new
FileReference(new File(
- "data/tpch0.001/orders.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC2_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID);
-
- IMissingWriterFactory[] nonMatchWriterFactories = new
IMissingWriterFactory[ordersDesc.getFieldCount()];
- for (int j = 0; j < nonMatchWriterFactories.length; j++) {
- nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE;
- }
-
- InMemoryHashJoinOperatorDescriptor join = new
InMemoryHashJoinOperatorDescriptor(
- spec,
- new int[] { 0 },
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- null, custOrderJoinDesc, true, nonMatchWriterFactories, 128);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
- IConnectorDescriptor custJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(custJoinConn, custScanner, 0, join, 0);
-
- IConnectorDescriptor joinPrinterConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDGraceHashLeftOuterJoin() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new
FileReference(new File(
- "data/tpch0.001/customer.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new
FileReference(new File(
- "data/tpch0.001/orders.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC2_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID);
-
- IMissingWriterFactory[] nonMatchWriterFactories = new
IMissingWriterFactory[ordersDesc.getFieldCount()];
- for (int j = 0; j < nonMatchWriterFactories.length; j++) {
- nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE;
- }
-
- GraceHashJoinOperatorDescriptor join = new
GraceHashJoinOperatorDescriptor(
- spec,
- 5,
- 20,
- 200,
- 1.2,
- new int[] { 0 },
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, true, nonMatchWriterFactories, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
- IConnectorDescriptor custJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(custJoinConn, custScanner, 0, join, 0);
-
- IConnectorDescriptor joinPrinterConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDHybridHashLeftOuterJoin() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new
FileReference(new File(
- "data/tpch0.001/customer.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new
FileReference(new File(
- "data/tpch0.001/orders.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC2_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID);
-
- IMissingWriterFactory[] nonMatchWriterFactories = new
IMissingWriterFactory[ordersDesc.getFieldCount()];
- for (int j = 0; j < nonMatchWriterFactories.length; j++) {
- nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE;
- }
-
- HybridHashJoinOperatorDescriptor join = new
HybridHashJoinOperatorDescriptor(
- spec,
- 5,
- 20,
- 200,
- 1.2,
- new int[] { 0 },
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null, true, nonMatchWriterFactories);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
- IConnectorDescriptor custJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(custJoinConn, custScanner, 0, join, 0);
-
- IConnectorDescriptor joinPrinterConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDJoinMulti() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/customer-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/customer-part2.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/orders-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/orders-part2.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC1_ID, NC2_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID, NC2_ID);
-
- InMemoryHashJoinOperatorDescriptor join = new
InMemoryHashJoinOperatorDescriptor(
- spec,
- new int[] { 1 },
- new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 128, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID, NC2_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
- IConnectorDescriptor custJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(custJoinConn, custScanner, 0, join, 1);
-
- IConnectorDescriptor joinPrinterConn = new
MToNBroadcastConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDGraceJoinMulti() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/customer-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/customer-part2.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/orders-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/orders-part2.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC1_ID, NC2_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID, NC2_ID);
-
- GraceHashJoinOperatorDescriptor join = new
GraceHashJoinOperatorDescriptor(
- spec,
- 3,
- 20,
- 100,
- 1.2,
- new int[] { 1 },
- new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID, NC2_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
- IConnectorDescriptor custJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(custJoinConn, custScanner, 0, join, 1);
-
- IConnectorDescriptor joinPrinterConn = new
MToNBroadcastConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDHybridHashJoinMulti() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/customer-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/customer-part2.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/orders-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/orders-part2.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC1_ID, NC2_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID, NC2_ID);
-
- HybridHashJoinOperatorDescriptor join = new
HybridHashJoinOperatorDescriptor(
- spec,
- 3,
- 20,
- 100,
- 1.2,
- new int[] { 1 },
- new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null, false, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID, NC2_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
- IConnectorDescriptor custJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(custJoinConn, custScanner, 0, join, 1);
-
- IConnectorDescriptor joinPrinterConn = new
MToNBroadcastConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDJoinAutoExpand() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/customer-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/customer-part2.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/orders-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/orders-part2.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC1_ID, NC2_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID, NC2_ID);
-
- InMemoryHashJoinOperatorDescriptor join = new
InMemoryHashJoinOperatorDescriptor(
- spec,
- new int[] { 1 },
- new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 128, null);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(ordJoinConn, ordScanner, 0, join, 0);
-
- IConnectorDescriptor custJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(custJoinConn, custScanner, 0, join, 1);
-
- IConnectorDescriptor joinPrinterConn = new
MToNBroadcastConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void customerOrderCIDJoinMultiMaterialized() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/customer-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/customer-part2.tbl"))) };
- IFileSplitProvider custSplitsProvider = new
ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer() });
-
- FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new FileReference(new
File("data/tpch0.001/orders-part1.tbl"))),
- new FileSplit(NC2_ID, new FileReference(new
File("data/tpch0.001/orders-part2.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new
ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new
UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor ordScanner = new
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
ordScanner, NC1_ID, NC2_ID);
-
- FileScanOperatorDescriptor custScanner = new
FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[]
{ UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID, NC2_ID);
-
- MaterializingOperatorDescriptor ordMat = new
MaterializingOperatorDescriptor(spec, ordersDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordMat,
NC1_ID, NC2_ID);
-
- MaterializingOperatorDescriptor custMat = new
MaterializingOperatorDescriptor(spec, custDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custMat,
NC1_ID, NC2_ID);
-
- InMemoryHashJoinOperatorDescriptor join = new
InMemoryHashJoinOperatorDescriptor(
- spec,
- new int[] { 1 },
- new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 128, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID, NC2_ID);
-
- ResultSetId rsId = new ResultSetId(1);
- spec.addResultSetId(rsId);
-
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
-
- IConnectorDescriptor ordPartConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
-
- IConnectorDescriptor custPartConn = new
MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(custPartConn, custScanner, 0, custMat, 0);
-
- IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(ordJoinConn, ordMat, 0, join, 0);
-
- IConnectorDescriptor custJoinConn = new
OneToOneConnectorDescriptor(spec);
- spec.connect(custJoinConn, custMat, 0, join, 1);
-
- IConnectorDescriptor joinPrinterConn = new
MToNBroadcastConnectorDescriptor(spec);
- spec.connect(joinPrinterConn, join, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 507e1c7..0d9a6b4 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -61,7 +61,6 @@
import
org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import
org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import
org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
@@ -195,14 +194,6 @@
new
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
0, 1),
new
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
1, 0),
null);
-
- } else if ("grace".equalsIgnoreCase(algo)) {
- join = new GraceHashJoinOperatorDescriptor(spec, memSize,
graceInputSize, graceRecordsPerFrame, graceFactor,
- new int[] { 0 }, new int[] { 1 },
- new IBinaryHashFunctionFactory[] {
-
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- Common.custOrderJoinDesc, null);
} else {
System.err.println("unknown algorithm:" + algo);
--
To view, visit https://asterix-gerrit.ics.uci.edu/1353
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I16e9e4c73d7851f18a48c2715a6bc5c903b74eba
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Taewoo Kim <[email protected]>