Taewoo Kim has submitted this change and it was merged. Change subject: ASTERIXDB-1736: Remove Grace Hash Join (not being used) ......................................................................
ASTERIXDB-1736: Remove Grace Hash Join (not being used) - Removed Grace Hash Join that is not currently being used since we always use Optimized Hybrid Hash Join. Change-Id: I16e9e4c73d7851f18a48c2715a6bc5c903b74eba Reviewed-on: https://asterix-gerrit.ics.uci.edu/1353 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> --- D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java D hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java M hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java 6 files changed, 197 insertions(+), 985 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java deleted file mode 100644 index 2f7b1c2..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.std.join; - -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.ActivityId; -import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.TaskId; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; -import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; -import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; -import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; - -public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor { - private static final int RPARTITION_ACTIVITY_ID = 0; - private static final int SPARTITION_ACTIVITY_ID = 1; - private static final int JOIN_ACTIVITY_ID = 2; - - private static final long serialVersionUID = 1L; - private final int[] keys0; - private final int[] keys1; - private final int inputsize0; - private final int recordsPerFrame; - private final int memsize; - private final double factor; - private final IBinaryHashFunctionFactory[] hashFunctionFactories; - private final IBinaryComparatorFactory[] comparatorFactories; - private final IPredicateEvaluatorFactory predEvaluatorFactory; - private final boolean isLeftOuter; - private final IMissingWriterFactory[] nullWriterFactories1; - - public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, - int recordsPerFrame, double factor, int[] keys0, int[] keys1, - IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory) { - super(spec, 2, 1); - this.memsize = memsize; - this.inputsize0 = inputsize0; - this.recordsPerFrame = recordsPerFrame; - this.factor = factor; - this.keys0 = keys0; - this.keys1 = keys1; - this.hashFunctionFactories = hashFunctionFactories; - this.comparatorFactories = comparatorFactories; - this.predEvaluatorFactory = predEvalFactory; - this.isLeftOuter = false; - this.nullWriterFactories1 = null; - recordDescriptors[0] = recordDescriptor; - } - - public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, - int recordsPerFrame, double factor, int[] keys0, int[] keys1, - IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1, - IPredicateEvaluatorFactory predEvalFactory) { - super(spec, 2, 1); - this.memsize = memsize; - this.inputsize0 = inputsize0; - this.recordsPerFrame = recordsPerFrame; - this.factor = factor; - this.keys0 = keys0; - this.keys1 = keys1; - this.hashFunctionFactories = hashFunctionFactories; - this.comparatorFactories = comparatorFactories; - this.predEvaluatorFactory = predEvalFactory; - this.isLeftOuter = isLeftOuter; - this.nullWriterFactories1 = nullWriterFactories1; - recordDescriptors[0] = recordDescriptor; - } - - @Override - public void contributeActivities(IActivityGraphBuilder builder) { - ActivityId rpartAid = new ActivityId(odId, RPARTITION_ACTIVITY_ID); - HashPartitionActivityNode rpart = new HashPartitionActivityNode(rpartAid, keys0); - ActivityId spartAid = new ActivityId(odId, SPARTITION_ACTIVITY_ID); - HashPartitionActivityNode spart = new HashPartitionActivityNode(spartAid, keys1); - JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, JOIN_ACTIVITY_ID), rpartAid, spartAid); - - builder.addActivity(this, rpart); - builder.addSourceEdge(0, rpart, 0); - - builder.addActivity(this, spart); - builder.addSourceEdge(1, spart, 0); - - builder.addActivity(this, join); - builder.addBlockingEdge(rpart, spart); - builder.addBlockingEdge(spart, join); - - builder.addTargetEdge(0, join, 0); - } - - public int getMemorySize() { - return memsize; - } - - private class HashPartitionActivityNode extends AbstractActivityNode { - private static final long serialVersionUID = 1L; - private int[] keys; - - public HashPartitionActivityNode(ActivityId id, int[] keys) { - super(id); - this.keys = keys; - } - - @Override - public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { - return new GraceHashJoinPartitionBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), - keys, hashFunctionFactories, comparatorFactories, - (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions)), - recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)); - } - } - - private class JoinActivityNode extends AbstractActivityNode { - private static final long serialVersionUID = 1L; - - private final ActivityId rpartAid; - - private final ActivityId spartAid; - - public JoinActivityNode(ActivityId id, ActivityId rpartAid, ActivityId spartAid) { - super(id); - this.rpartAid = rpartAid; - this.spartAid = spartAid; - } - - @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) { - final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(rpartAid, 0); - final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(spartAid, 0); - int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions)); - final IPredicateEvaluator predEvaluator = predEvaluatorFactory == null ? null - : predEvaluatorFactory.createPredicateEvaluator(); - - return new GraceHashJoinOperatorNodePushable(ctx, - new TaskId(new ActivityId(getOperatorId(), RPARTITION_ACTIVITY_ID), partition), - new TaskId(new ActivityId(getOperatorId(), SPARTITION_ACTIVITY_ID), partition), recordsPerFrame, - factor, keys0, keys1, hashFunctionFactories, comparatorFactories, nullWriterFactories1, rd1, rd0, - recordDescriptors[0], numPartitions, predEvaluator, isLeftOuter); - } - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java deleted file mode 100644 index 2de8e6c..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.std.join; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; -import org.apache.hyracks.api.dataflow.value.IMissingWriter; -import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; -import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; -import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory; -import org.apache.hyracks.dataflow.common.io.RunFileReader; -import org.apache.hyracks.dataflow.common.io.RunFileWriter; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; -import org.apache.hyracks.dataflow.std.structures.ISerializableTable; -import org.apache.hyracks.dataflow.std.structures.SerializableHashTable; - -class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable { - private final IHyracksTaskContext ctx; - private final Object state0Id; - private final Object state1Id; - private final int[] keys0; - private final int[] keys1; - private final IBinaryHashFunctionFactory[] hashFunctionFactories; - private final IBinaryComparatorFactory[] comparatorFactories; - private final IMissingWriterFactory[] nonMatchWriterFactories; - private final RecordDescriptor rd0; - private final RecordDescriptor rd1; - private final int recordsPerFrame; - private final double factor; - private final int numPartitions; - private final boolean isLeftOuter; - private final IPredicateEvaluator predEvaluator; - - GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object state0Id, Object state1Id, int recordsPerFrame, - double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories, - IBinaryComparatorFactory[] comparatorFactories, IMissingWriterFactory[] nullWriterFactories, - RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor outRecordDescriptor, int numPartitions, - IPredicateEvaluator predEval, boolean isLeftOuter) { - this.ctx = ctx; - this.state0Id = state0Id; - this.state1Id = state1Id; - this.keys0 = keys0; - this.keys1 = keys1; - this.hashFunctionFactories = hashFunctionFactories; - this.comparatorFactories = comparatorFactories; - this.nonMatchWriterFactories = nullWriterFactories; - this.rd0 = rd0; - this.rd1 = rd1; - this.numPartitions = numPartitions; - this.recordsPerFrame = recordsPerFrame; - this.factor = factor; - this.predEvaluator = predEval; - this.isLeftOuter = isLeftOuter; - } - - @Override - public void initialize() throws HyracksDataException { - GraceHashJoinPartitionState rState = (GraceHashJoinPartitionState) ctx.getStateObject(state0Id); - GraceHashJoinPartitionState sState = (GraceHashJoinPartitionState) ctx.getStateObject(state1Id); - RunFileWriter[] buildWriters = sState.getRunWriters(); - RunFileWriter[] probeWriters = rState.getRunWriters(); - - IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; - for (int i = 0; i < comparatorFactories.length; ++i) { - comparators[i] = comparatorFactories[i].createBinaryComparator(); - } - ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions, - new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner(); - ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(numPartitions, - new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)).createPartitioner(); - - final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null; - if (isLeftOuter) { - for (int i = 0; i < nonMatchWriterFactories.length; i++) { - nullWriters1[i] = nonMatchWriterFactories[i].createMissingWriter(); - } - } - try { - writer.open();// open for probe - IFrame buffer = new VSizeFrame(ctx); - // buffer - int tableSize = (int) (numPartitions * recordsPerFrame * factor); - ISerializableTable table = new SerializableHashTable(tableSize, ctx); - - for (int partitionid = 0; partitionid < numPartitions; partitionid++) { - RunFileWriter buildWriter = buildWriters[partitionid]; - RunFileWriter probeWriter = probeWriters[partitionid]; - if ((buildWriter == null && !isLeftOuter) || probeWriter == null) { - continue; - } - table.reset(); - InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpcRep0, - new FrameTupleAccessor(rd1), hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), - isLeftOuter, nullWriters1, table, predEvaluator); - - // build - if (buildWriter != null) { - RunFileReader buildReader = buildWriter.createDeleteOnCloseReader(); - buildReader.open(); - while (buildReader.nextFrame(buffer)) { - ByteBuffer copyBuffer = ctx.allocateFrame(buffer.getFrameSize()); - FrameUtils.copyAndFlip(buffer.getBuffer(), copyBuffer); - joiner.build(copyBuffer); - buffer.reset(); - } - buildReader.close(); - } - - // probe - RunFileReader probeReader = probeWriter.createDeleteOnCloseReader(); - probeReader.open(); - while (probeReader.nextFrame(buffer)) { - joiner.join(buffer.getBuffer(), writer); - buffer.reset(); - } - probeReader.close(); - joiner.closeJoin(writer); - } - } catch (Throwable th) { - writer.fail(); - throw new HyracksDataException(th); - } finally { - writer.close(); - } - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java deleted file mode 100644 index 5a5543b..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.std.join; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; -import org.apache.hyracks.dataflow.common.io.RunFileWriter; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; - -class GraceHashJoinPartitionBuildOperatorNodePushable extends - AbstractUnaryInputSinkOperatorNodePushable { - private final IHyracksTaskContext ctx; - private final Object stateId; - private final int numPartitions; - private final IBinaryComparator[] comparators; - private final FrameTupleAccessor accessor0; - private final ITuplePartitionComputer hpc; - private final FrameTupleAppender appender; - private IFrame[] outbufs; - private GraceHashJoinPartitionState state; - - GraceHashJoinPartitionBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keys, - IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories, - int numPartitions, RecordDescriptor inRecordDescriptor) { - this.ctx = ctx; - this.stateId = stateId; - this.numPartitions = numPartitions; - accessor0 = new FrameTupleAccessor(inRecordDescriptor); - appender = new FrameTupleAppender(); - hpc = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories).createPartitioner(); - comparators = new IBinaryComparator[comparatorFactories.length]; - for (int i = 0; i < comparatorFactories.length; ++i) { - comparators[i] = comparatorFactories[i].createBinaryComparator(); - } - } - - @Override - public void close() throws HyracksDataException { - for (int i = 0; i < numPartitions; i++) { - ByteBuffer head = outbufs[i].getBuffer(); - accessor0.reset(head); - if (accessor0.getTupleCount() > 0) { - write(i, head); - } - closeWriter(i); - } - - ctx.setStateObject(state); - } - - private void closeWriter(int i) throws HyracksDataException { - RunFileWriter writer = state.getRunWriters()[i]; - if (writer != null) { - writer.close(); - } - } - - private void write(int i, ByteBuffer head) throws HyracksDataException { - RunFileWriter writer = state.getRunWriters()[i]; - if (writer == null) { - FileReference file = ctx.getJobletContext().createManagedWorkspaceFile( - GraceHashJoinOperatorDescriptor.class.getSimpleName()); - writer = new RunFileWriter(file, ctx.getIOManager()); - writer.open(); - state.getRunWriters()[i] = writer; - } - writer.nextFrame(head); - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - accessor0.reset(buffer); - int tCount = accessor0.getTupleCount(); - for (int i = 0; i < tCount; ++i) { - - int entry = hpc.partition(accessor0, i, numPartitions); - IFrame outbuf = outbufs[entry]; - appender.reset(outbuf, false); - if (!appender.append(accessor0, i)) { - // buffer is full, ie. we cannot fit the tuple - // into the buffer -- write it to disk - write(entry, outbuf.getBuffer()); - outbuf.reset(); - appender.reset(outbuf, true); - if (!appender.append(accessor0, i)) { - throw new HyracksDataException("Item too big to fit in frame"); - } - } - } - } - - @Override - public void open() throws HyracksDataException { - state = new GraceHashJoinPartitionState(ctx.getJobletContext().getJobId(), stateId); - outbufs = new IFrame[numPartitions]; - state.setRunWriters(new RunFileWriter[numPartitions]); - for (int i = 0; i < numPartitions; i++) { - outbufs[i] = new VSizeFrame(ctx); - } - } - - @Override - public void fail() throws HyracksDataException { - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java deleted file mode 100644 index a970a6c..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.std.join; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.dataflow.common.io.RunFileWriter; -import org.apache.hyracks.dataflow.std.base.AbstractStateObject; - -public class GraceHashJoinPartitionState extends AbstractStateObject { - private RunFileWriter[] fWriters; - - public GraceHashJoinPartitionState(JobId jobId, Object id) { - super(jobId, id); - } - - public RunFileWriter[] getRunWriters() { - return fWriters; - } - - public void setRunWriters(RunFileWriter[] fWriters) { - this.fWriters = fWriters; - } - - @Override - public void toBytes(DataOutput out) throws IOException { - - } - - @Override - public void fromBytes(DataInput in) throws IOException { - - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java index 007c7b5..5270a70 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java @@ -45,7 +45,6 @@ import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor; @@ -84,130 +83,48 @@ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File( "data/tpch0.001/orders.tbl").getAbsolutePath(), false) }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor ordersDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor custOrderJoinDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID); FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); + new DelimitedDataTupleParserFactory( + new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, + '|'), + custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor( - spec, - new int[] { 1 }, + InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, new int[] { 0 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, custOrderJoinDesc, 128, null); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); - - ResultSetId rsId = new ResultSetId(1); - spec.addResultSetId(rsId); - - IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); - - IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec); - spec.connect(ordJoinConn, ordScanner, 0, join, 0); - - IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); - spec.connect(custJoinConn, custScanner, 0, join, 1); - - IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); - spec.connect(joinPrinterConn, join, 0, printer, 0); - - spec.addRoot(printer); - runTest(spec); - } - - @Test - public void customerOrderCIDGraceJoin() throws Exception { - JobSpecification spec = new JobSpecification(); - - FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File( - "data/tpch0.001/customer.tbl").getAbsolutePath(), false) }; - IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File( - "data/tpch0.001/orders.tbl").getAbsolutePath(), false) }; - IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); - - FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID); - - FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - - GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor( - spec, - 4, - 10, - 200, - 1.2, - new int[] { 1 }, - new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, - new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, null); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -282,7 +199,7 @@ HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor( spec, - 5, + 32, 20, 200, 1.2, @@ -329,38 +246,41 @@ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File( "data/tpch0.001/orders.tbl").getAbsolutePath(), false) }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor ordersDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor custOrderJoinDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); + new DelimitedDataTupleParserFactory( + new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, + '|'), + custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; @@ -368,101 +288,11 @@ nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE; } - InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor( - spec, - new int[] { 0 }, + InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 }, new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, null, custOrderJoinDesc, true, nonMatchWriterFactories, 128); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); - - ResultSetId rsId = new ResultSetId(1); - spec.addResultSetId(rsId); - - IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); - - IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec); - spec.connect(ordJoinConn, ordScanner, 0, join, 1); - - IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); - spec.connect(custJoinConn, custScanner, 0, join, 0); - - IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); - spec.connect(joinPrinterConn, join, 0, printer, 0); - - spec.addRoot(printer); - runTest(spec); - } - - @Test - public void customerOrderCIDGraceHashLeftOuterJoin() throws Exception { - JobSpecification spec = new JobSpecification(); - - FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File( - "data/tpch0.001/customer.tbl").getAbsolutePath(), false) }; - IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File( - "data/tpch0.001/orders.tbl").getAbsolutePath(), false) }; - IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); - - FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - - FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - - IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; - for (int j = 0; j < nonMatchWriterFactories.length; j++) { - nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE; - } - - GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor( - spec, - 5, - 20, - 200, - 1.2, - new int[] { 0 }, - new int[] { 1 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, - new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, true, nonMatchWriterFactories, null); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -501,38 +331,41 @@ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File( "data/tpch0.001/orders.tbl").getAbsolutePath(), false) }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor ordersDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor custOrderJoinDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); + new DelimitedDataTupleParserFactory( + new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, + '|'), + custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; @@ -540,14 +373,8 @@ nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE; } - HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor( - spec, - 5, - 20, - 200, - 1.2, - new int[] { 0 }, - new int[] { 1 }, + HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 32, 20, 200, 1.2, + new int[] { 0 }, new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, custOrderJoinDesc, null, true, nonMatchWriterFactories); @@ -591,138 +418,48 @@ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl").getAbsolutePath(), false), new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl").getAbsolutePath(), false) }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor ordersDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor custOrderJoinDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); + new DelimitedDataTupleParserFactory( + new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, + '|'), + custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); - InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor( - spec, - new int[] { 1 }, + InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, new int[] { 0 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, custOrderJoinDesc, 128, null); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID); - - ResultSetId rsId = new ResultSetId(1); - spec.addResultSetId(rsId); - - IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); - - IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec, - new FieldHashPartitionComputerFactory(new int[] { 1 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory - .of(UTF8StringPointable.FACTORY) })); - spec.connect(ordJoinConn, ordScanner, 0, join, 0); - - IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec, - new FieldHashPartitionComputerFactory(new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory - .of(UTF8StringPointable.FACTORY) })); - spec.connect(custJoinConn, custScanner, 0, join, 1); - - IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec); - spec.connect(joinPrinterConn, join, 0, printer, 0); - - spec.addRoot(printer); - runTest(spec); - } - - @Test - public void customerOrderCIDGraceJoinMulti() throws Exception { - JobSpecification spec = new JobSpecification(); - - FileSplit[] custSplits = new FileSplit[] { - new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl").getAbsolutePath(), false), - new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl").getAbsolutePath(), false) }; - IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - FileSplit[] ordersSplits = new FileSplit[] { - new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl").getAbsolutePath(), false), - new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl").getAbsolutePath(), false) }; - IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); - - FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); - - FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); - - GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor( - spec, - 3, - 20, - 100, - 1.2, - new int[] { 1 }, - new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, - new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, null); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID); ResultSetId rsId = new ResultSetId(1); @@ -769,48 +506,45 @@ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl").getAbsolutePath(), false), new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl").getAbsolutePath(), false) }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor ordersDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor custOrderJoinDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); + new DelimitedDataTupleParserFactory( + new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, + '|'), + custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); - HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor( - spec, - 3, - 20, - 100, - 1.2, - new int[] { 1 }, - new int[] { 0 }, + HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2, + new int[] { 1 }, new int[] { 0 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, custOrderJoinDesc, null, false, null); @@ -824,15 +558,13 @@ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec, - new FieldHashPartitionComputerFactory(new int[] { 1 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory - .of(UTF8StringPointable.FACTORY) })); + new FieldHashPartitionComputerFactory(new int[] { 1 }, new IBinaryHashFunctionFactory[] { + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) })); spec.connect(ordJoinConn, ordScanner, 0, join, 0); IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec, - new FieldHashPartitionComputerFactory(new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory - .of(UTF8StringPointable.FACTORY) })); + new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] { + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) })); spec.connect(custJoinConn, custScanner, 0, join, 1); IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec); @@ -860,43 +592,44 @@ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl").getAbsolutePath(), false), new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl").getAbsolutePath(), false) }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor ordersDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor custOrderJoinDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); + new DelimitedDataTupleParserFactory( + new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, + '|'), + custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); - InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor( - spec, - new int[] { 1 }, + InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, new int[] { 0 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, @@ -911,15 +644,13 @@ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec, - new FieldHashPartitionComputerFactory(new int[] { 1 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory - .of(UTF8StringPointable.FACTORY) })); + new FieldHashPartitionComputerFactory(new int[] { 1 }, new IBinaryHashFunctionFactory[] { + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) })); spec.connect(ordJoinConn, ordScanner, 0, join, 0); IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec, - new FieldHashPartitionComputerFactory(new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory - .of(UTF8StringPointable.FACTORY) })); + new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] { + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) })); spec.connect(custJoinConn, custScanner, 0, join, 1); IConnectorDescriptor joinPrinterConn = new MToNBroadcastConnectorDescriptor(spec); @@ -947,38 +678,41 @@ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl").getAbsolutePath(), false), new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl").getAbsolutePath(), false) }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor ordersDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor custOrderJoinDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE }, '|'), custDesc); + new DelimitedDataTupleParserFactory( + new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, + '|'), + custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc); @@ -987,9 +721,7 @@ MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custMat, NC1_ID, NC2_ID); - InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor( - spec, - new int[] { 1 }, + InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, new int[] { 0 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, @@ -1004,15 +736,13 @@ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordPartConn = new MToNPartitioningConnectorDescriptor(spec, - new FieldHashPartitionComputerFactory(new int[] { 1 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory - .of(UTF8StringPointable.FACTORY) })); + new FieldHashPartitionComputerFactory(new int[] { 1 }, new IBinaryHashFunctionFactory[] { + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) })); spec.connect(ordPartConn, ordScanner, 0, ordMat, 0); IConnectorDescriptor custPartConn = new MToNPartitioningConnectorDescriptor(spec, - new FieldHashPartitionComputerFactory(new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory - .of(UTF8StringPointable.FACTORY) })); + new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] { + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) })); spec.connect(custPartConn, custScanner, 0, custMat, 0); IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java index 9f44da6..65073e0 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java @@ -61,7 +61,6 @@ import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory; import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory; import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor; -import org.apache.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory; import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor; @@ -195,14 +194,6 @@ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null); - - } else if ("grace".equalsIgnoreCase(algo)) { - join = new GraceHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceRecordsPerFrame, graceFactor, - new int[] { 0 }, new int[] { 1 }, - new IBinaryHashFunctionFactory[] { - PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, - new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - Common.custOrderJoinDesc, null); } else { System.err.println("unknown algorithm:" + algo); -- To view, visit https://asterix-gerrit.ics.uci.edu/1353 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I16e9e4c73d7851f18a48c2715a6bc5c903b74eba Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Taewoo Kim <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Taewoo Kim <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
