Stephen Ermshar has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/3453
Change subject: [WIP] Implementing Merge Join ...................................................................... [WIP] Implementing Merge Join Change-Id: Ica175b89b901fede3e539a9b0af7feee430304eb --- A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinChecker.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinCheckerFactory.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoiner.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeBranchStatus.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinLocks.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinOperatorDescriptor.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinTaskState.java 7 files changed, 679 insertions(+), 0 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/53/3453/1 diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinChecker.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinChecker.java new file mode 100644 index 0000000..f34268e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinChecker.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join.mergejoin; + +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor; + +import java.io.Serializable; + +public interface IMergeJoinChecker extends Serializable { + + /** + * Check to see if the right tuple should be added to memory during the merge join. + * The memory is used to check the right tuple with the remaining left tuples. + * The check is true if the next left tuple could still match with this right tuple. + * + * @param accessorLeft + * @param accessorRight + * @return boolean + * @throws HyracksDataException + */ + boolean checkToSaveInMemory(ITupleAccessor accessorLeft, ITupleAccessor accessorRight) throws HyracksDataException; + + boolean checkToSaveInMemory(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight, + int rightTupleIndex) throws HyracksDataException; + + /** + * Check to see if the right tuple should be removed from memory during the merge join. + * The memory is used to check the right tuple with the remaining left tuples. + * The check is true if the next left tuple is NOT able match with this right tuple. + * + * @param accessorLeft + * @param accessorRight + * @return boolean + * @throws HyracksDataException + */ + boolean checkToRemoveInMemory(ITupleAccessor accessorLeft, ITupleAccessor accessorRight) + throws HyracksDataException; + + boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int leftTupleIndex, + IFrameTupleAccessor accessorRight, int rightTupleIndex) throws HyracksDataException; + + /** + * Check to see if the next right tuple should be loaded during the merge join. + * The check is true if the left tuple could match with the next right tuple. + * Once the left tuple can no long match, the check returns false. + * + * @param accessorLeft + * @param accessorRight + * @return boolean + * @throws HyracksDataException + */ + boolean checkToLoadNextRightTuple(ITupleAccessor accessorLeft, ITupleAccessor accessorRight) + throws HyracksDataException; + + + /** + * Check to see if the left tuple should continue checking for matches. + * The check is true if the next left tuple is NOT able match with this right tuple. + * + * @param accessorLeft + * @param accessorRight + * @return boolean + * @throws HyracksDataException + */ + boolean checkIfMoreMatches(ITupleAccessor accessorLeft, ITupleAccessor accessorRight) + throws HyracksDataException; + + boolean checkIfMoreMatches(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight, + int rightTupleIndex) throws HyracksDataException; + + boolean checkToSaveInResult(ITupleAccessor accessorLeft, ITupleAccessor accessorRight) throws HyracksDataException; + + boolean checkToSaveInResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight, + int rightTupleIndex, boolean reversed) throws HyracksDataException; + + // ============================================= + // Copied from IIntervalMergeJoinChecker + // ============================================= + + boolean checkToRemoveLeftActive(); + + boolean checkToRemoveRightActive(); + + boolean checkToIncrementMerge(ITupleAccessor accessorLeft, ITupleAccessor accessorRight) + throws HyracksDataException; + + boolean checkToSaveInResult(long start0, long end0, long start1, long end1, boolean reversed); + + boolean checkToLoadNextRightTuple(IFrameTupleAccessor accessorLeft, int leftTupleIndex, + IFrameTupleAccessor accessorRight, int rightTupleIndex) throws HyracksDataException; + + // boolean compareIntervalPartition(int s1, int e1, int s2, int e2); + // boolean compareInterval(AIntervalPointable ipLeft, AIntervalPointable ipRight) throws HyracksDataException; + // boolean compareInterval(long start0, long end0, long start1, long end1) throws HyracksDataException; +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinCheckerFactory.java new file mode 100644 index 0000000..aa867d8 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinCheckerFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join.mergejoin; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +import java.io.Serializable; + +public interface IMergeJoinCheckerFactory extends Serializable { + + IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, IHyracksTaskContext ctx) throws + HyracksDataException; + + RangePartitioningType getLeftPartitioningType(); + + RangePartitioningType getRightPartitioningType(); + + boolean isOrderAsc(); + +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoiner.java new file mode 100644 index 0000000..7f4b204 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoiner.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join.mergejoin; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +import java.nio.ByteBuffer; + +public interface IMergeJoiner { + + void processLeftFrame(IFrameWriter writer) throws HyracksDataException; + + void processLeftClose(IFrameWriter writer) throws HyracksDataException; + + void setFrame(int partition, ByteBuffer buffer) throws HyracksDataException; + +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeBranchStatus.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeBranchStatus.java new file mode 100644 index 0000000..1bc14f3 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeBranchStatus.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join.mergejoin; + +import org.apache.hyracks.dataflow.std.join.IRunFileStreamStatus; + +import java.io.Serializable; + +public class MergeBranchStatus implements IRunFileStreamStatus, Serializable { + private static final long serialVersionUID = 1L; + + public enum Stage { + UNKNOWN, + OPENED, + DATA_PROCESSING, + JOIN_PROCESSING, + CLOSED; + + public boolean isEqualOrBefore(Stage bs) { + return this.ordinal() <= bs.ordinal(); + } + } + + private boolean hasMore = true; + + private Stage stage = Stage.UNKNOWN; + + private boolean runFileWriting = false; + private boolean runFileReading = false; + + public Stage getStatus() { + return stage; + } + + public void setStageOpen() { + stage = Stage.OPENED; + } + + public void setStageData() { + stage = Stage.DATA_PROCESSING; + } + + public void setStageJoin() { + stage = Stage.JOIN_PROCESSING; + } + + public void setStageClose() { + stage = Stage.CLOSED; + } + + public boolean hasMore() { + return hasMore; + } + + public void noMore() { + this.hasMore = false; + } + + @Override + public boolean isRunFileWriting() { + return runFileWriting; + } + + @Override + public void setRunFileWriting(boolean runFileWriting) { + this.runFileWriting = runFileWriting; + } + + @Override + public boolean isRunFileReading() { + return runFileReading; + } + + @Override + public void setRunFileReading(boolean runFileReading) { + this.runFileReading = runFileReading; + } + + @Override + public String toString() { + return "Branch status is " + stage + ": the stream " + (hasMore ? "has more" : "is empty") + + " and the run file is " + (runFileWriting ? "WRITING " : "") + (runFileReading ? "READING " : ""); + + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinLocks.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinLocks.java new file mode 100644 index 0000000..692ca98 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinLocks.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join.mergejoin; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class MergeJoinLocks implements Serializable { + private static final long serialVersionUID = 1L; + + private final List<Lock> lock = new ArrayList<>(); + private final List<Condition> left = new ArrayList<>(); + private final List<Condition> right = new ArrayList<>(); + + public synchronized void setPartitions(int partitions) { + for (int i = lock.size(); i < partitions; ++i) { + lock.add(new ReentrantLock()); + left.add(lock.get(i).newCondition()); + right.add(lock.get(i).newCondition()); + } + } + + public Lock getLock(int partition) { + return lock.get(partition); + } + + public Condition getLeft(int partition) { + return left.get(partition); + } + + public Condition getRight(int partition) { + return right.get(partition); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinOperatorDescriptor.java new file mode 100644 index 0000000..f5a808c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinOperatorDescriptor.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join.mergejoin; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.IActivity; +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.TaskId; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.hyracks.dataflow.std.join.mergejoin.MergeBranchStatus.Stage; + +import java.nio.ByteBuffer; + +/** + * The merge join is made up of two operators: left and right. + * The right operator loads right stream into memory for the merge process. + * The left operator streams the left input and the right memory store to merge and join the data. + */ +public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { + private static final long serialVersionUID = 1L; + + private static final int LEFT_ACTIVITY_ID = 0; + private static final int RIGHT_ACTIVITY_ID = 1; + private final int[] leftKeys; + private final int[] rightKeys; + private final int memoryForJoin; + private final IMergeJoinCheckerFactory mergeJoinCheckerFactory; + + public MergeJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memoryForJoin, + RecordDescriptor recordDescriptor, int[] leftKeys, int[] rightKeys, + IMergeJoinCheckerFactory mergeJoinCheckerFactory) { + super(spec, 2, 1); + recordDescriptors[0] = recordDescriptor; + this.leftKeys = leftKeys; + this.rightKeys = rightKeys; + this.memoryForJoin = memoryForJoin; + this.mergeJoinCheckerFactory = mergeJoinCheckerFactory; + } + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + MergeJoinLocks locks = new MergeJoinLocks(); + + ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID); + ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID); + + IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, locks); + IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, locks); + + builder.addActivity(this, rightAN); + builder.addSourceEdge(1, rightAN, 0); + + builder.addActivity(this, leftAN); + builder.addSourceEdge(0, leftAN, 0); + builder.addTargetEdge(0, leftAN, 0); + } + + private class LeftJoinerActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + private final MergeJoinLocks locks; + + public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { + super(id); + this.locks = locks; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + locks.setPartitions(nPartitions); + final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + return new LeftJoinerOperator(ctx, partition, inRecordDesc); + } + + private class LeftJoinerOperator extends AbstractUnaryInputUnaryOutputOperatorNodePushable { + + private final IHyracksTaskContext ctx; + private final int partition; + private final RecordDescriptor leftRd; + private MergeJoinTaskState state; + private boolean first = true; + int count = 0; + + public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) { + this.ctx = ctx; + this.partition = partition; + this.leftRd = inRecordDesc; + } + + @Override + public void open() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + writer.open(); + state = new MergeJoinTaskState(ctx.getJobletContext().getJobId(), + new TaskId(getActivityId(), partition)); + state.leftRd = leftRd; + ctx.setStateObject(state); + locks.getRight(partition).signal(); + + do { + // Continue after joiner created in right branch. + if (state.joiner == null) { + locks.getLeft(partition).await(); + } + } while (state.joiner == null); + state.status.branch[LEFT_ACTIVITY_ID].setStageOpen(); + locks.getRight(partition).signal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + locks.getLock(partition).lock(); + + count++; + if (first) { + state.status.branch[LEFT_ACTIVITY_ID].setStageData(); + first = false; + } + try { + state.joiner.setFrame(LEFT_ACTIVITY_ID, buffer); + state.joiner.processLeftFrame(writer); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void fail() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.failed = true; + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void close() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.status.branch[LEFT_ACTIVITY_ID].noMore(); + if (state.failed) { + writer.fail(); + } else { + state.joiner.processLeftClose(writer); + writer.close(); + } + state.status.branch[LEFT_ACTIVITY_ID].setStageClose(); + locks.getRight(partition).signal(); + } finally { + locks.getLock(partition).unlock(); + } +// System.err.println("Left next calls: " + count); + } + } + } + + private class RightDataActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + private final ActivityId joinAid; + private final MergeJoinLocks locks; + + public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { + super(id); + this.joinAid = joinAid; + this.locks = locks; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + locks.setPartitions(nPartitions); + RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys, ctx); + return new RightDataOperator(ctx, partition, inRecordDesc, mjc); + } + + private class RightDataOperator extends AbstractUnaryInputSinkOperatorNodePushable { + + private int partition; + private IHyracksTaskContext ctx; + private final RecordDescriptor rightRd; + private final IMergeJoinChecker mjc; + private MergeJoinTaskState state; + private boolean first = true; + int count = 0; + + public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc, + IMergeJoinChecker mjc) { + this.ctx = ctx; + this.partition = partition; + this.rightRd = inRecordDesc; + this.mjc = mjc; + } + + @Override + public void open() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + do { + // Wait for the state to be set in the context form Left. + state = (MergeJoinTaskState) ctx.getStateObject(new TaskId(joinAid, partition)); + if (state == null) { + locks.getRight(partition).await(); + } + } while (state == null); + state.rightRd = rightRd; + state.joiner = new MergeJoiner(ctx, memoryForJoin, partition, state.status, locks, mjc, + state.leftRd, state.rightRd); + state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen(); + locks.getLeft(partition).signal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + locks.getLock(partition).lock(); + count++; + if (first) { + state.status.branch[RIGHT_ACTIVITY_ID].setStageData(); + first = false; + } + try { + while (!state.status.continueRightLoad + && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) { + // Wait for the state to request right frame unless left has finished. + locks.getRight(partition).await(); + } + state.joiner.setFrame(RIGHT_ACTIVITY_ID, buffer); + state.status.continueRightLoad = false; + locks.getLeft(partition).signal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void fail() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.failed = true; + locks.getLeft(partition).signal(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void close() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.status.branch[RIGHT_ACTIVITY_ID].setStageClose(); + locks.getLeft(partition).signal(); + } finally { + locks.getLock(partition).unlock(); + } +// System.err.println("Right next calls: " + count); + } + } + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinTaskState.java new file mode 100644 index 0000000..6582c89 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinTaskState.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join.mergejoin; + +import org.apache.hyracks.api.dataflow.TaskId; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.dataflow.std.base.AbstractStateObject; + +public class MergeJoinTaskState extends AbstractStateObject { + public MergeStatus status; + public IMergeJoiner joiner; + public boolean failed; + public RecordDescriptor leftRd; + public RecordDescriptor rightRd; + + public MergeJoinTaskState(JobId jobId, TaskId taskId) { + super(jobId, taskId); + status = new MergeStatus(); + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/3453 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: Ica175b89b901fede3e539a9b0af7feee430304eb Gerrit-Change-Number: 3453 Gerrit-PatchSet: 1 Gerrit-Owner: Stephen Ermshar <[email protected]>
