Stephen Ermshar has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3453


Change subject: [WIP] Implementing Merge Join
......................................................................

[WIP] Implementing Merge Join

Change-Id: Ica175b89b901fede3e539a9b0af7feee430304eb
---
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinChecker.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinCheckerFactory.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoiner.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeBranchStatus.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinLocks.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinOperatorDescriptor.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinTaskState.java
7 files changed, 679 insertions(+), 0 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/53/3453/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinChecker.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinChecker.java
new file mode 100644
index 0000000..f34268e
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinChecker.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join.mergejoin;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
+
+import java.io.Serializable;
+
+public interface IMergeJoinChecker extends Serializable {
+
+    /**
+     * Check to see if the right tuple should be added to memory during the 
merge join.
+     * The memory is used to check the right tuple with the remaining left 
tuples.
+     * The check is true if the next left tuple could still match with this 
right tuple.
+     *
+     * @param accessorLeft
+     * @param accessorRight
+     * @return boolean
+     * @throws HyracksDataException
+     */
+    boolean checkToSaveInMemory(ITupleAccessor accessorLeft, ITupleAccessor 
accessorRight) throws HyracksDataException;
+
+    boolean checkToSaveInMemory(IFrameTupleAccessor accessorLeft, int 
leftTupleIndex, IFrameTupleAccessor accessorRight,
+            int rightTupleIndex) throws HyracksDataException;
+
+    /**
+     * Check to see if the right tuple should be removed from memory during 
the merge join.
+     * The memory is used to check the right tuple with the remaining left 
tuples.
+     * The check is true if the next left tuple is NOT able match with this 
right tuple.
+     *
+     * @param accessorLeft
+     * @param accessorRight
+     * @return boolean
+     * @throws HyracksDataException
+     */
+    boolean checkToRemoveInMemory(ITupleAccessor accessorLeft, ITupleAccessor 
accessorRight)
+            throws HyracksDataException;
+
+    boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int 
leftTupleIndex,
+            IFrameTupleAccessor accessorRight, int rightTupleIndex) throws 
HyracksDataException;
+
+    /**
+     * Check to see if the next right tuple should be loaded during the merge 
join.
+     * The check is true if the left tuple could match with the next right 
tuple.
+     * Once the left tuple can no long match, the check returns false.
+     *
+     * @param accessorLeft
+     * @param accessorRight
+     * @return boolean
+     * @throws HyracksDataException
+     */
+    boolean checkToLoadNextRightTuple(ITupleAccessor accessorLeft, 
ITupleAccessor accessorRight)
+            throws HyracksDataException;
+
+
+    /**
+     * Check to see if the left tuple should continue checking for matches.
+     * The check is true if the next left tuple is NOT able match with this 
right tuple.
+     *
+     * @param accessorLeft
+     * @param accessorRight
+     * @return boolean
+     * @throws HyracksDataException
+     */
+    boolean checkIfMoreMatches(ITupleAccessor accessorLeft, ITupleAccessor 
accessorRight)
+            throws HyracksDataException;
+
+    boolean checkIfMoreMatches(IFrameTupleAccessor accessorLeft, int 
leftTupleIndex, IFrameTupleAccessor accessorRight,
+            int rightTupleIndex) throws HyracksDataException;
+
+    boolean checkToSaveInResult(ITupleAccessor accessorLeft, ITupleAccessor 
accessorRight) throws HyracksDataException;
+
+    boolean checkToSaveInResult(IFrameTupleAccessor accessorLeft, int 
leftTupleIndex, IFrameTupleAccessor accessorRight,
+            int rightTupleIndex, boolean reversed) throws HyracksDataException;
+
+    // =============================================
+    // Copied from IIntervalMergeJoinChecker
+    // =============================================
+
+    boolean checkToRemoveLeftActive();
+
+    boolean checkToRemoveRightActive();
+
+    boolean checkToIncrementMerge(ITupleAccessor accessorLeft, ITupleAccessor 
accessorRight)
+            throws HyracksDataException;
+
+    boolean checkToSaveInResult(long start0, long end0, long start1, long 
end1, boolean reversed);
+
+    boolean checkToLoadNextRightTuple(IFrameTupleAccessor accessorLeft, int 
leftTupleIndex,
+            IFrameTupleAccessor accessorRight, int rightTupleIndex) throws 
HyracksDataException;
+
+    // boolean compareIntervalPartition(int s1, int e1, int s2, int e2);
+    // boolean compareInterval(AIntervalPointable ipLeft, AIntervalPointable 
ipRight) throws HyracksDataException;
+    // boolean compareInterval(long start0, long end0, long start1, long end1) 
throws HyracksDataException;
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinCheckerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinCheckerFactory.java
new file mode 100644
index 0000000..aa867d8
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoinCheckerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join.mergejoin;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import java.io.Serializable;
+
+public interface IMergeJoinCheckerFactory extends Serializable {
+
+    IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, 
IHyracksTaskContext ctx) throws
+            HyracksDataException;
+
+    RangePartitioningType getLeftPartitioningType();
+
+    RangePartitioningType getRightPartitioningType();
+
+    boolean isOrderAsc();
+
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoiner.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoiner.java
new file mode 100644
index 0000000..7f4b204
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/IMergeJoiner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join.mergejoin;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import java.nio.ByteBuffer;
+
+public interface IMergeJoiner {
+
+    void processLeftFrame(IFrameWriter writer) throws HyracksDataException;
+
+    void processLeftClose(IFrameWriter writer) throws HyracksDataException;
+
+    void setFrame(int partition, ByteBuffer buffer) throws 
HyracksDataException;
+
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeBranchStatus.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeBranchStatus.java
new file mode 100644
index 0000000..1bc14f3
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeBranchStatus.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join.mergejoin;
+
+import org.apache.hyracks.dataflow.std.join.IRunFileStreamStatus;
+
+import java.io.Serializable;
+
+public class MergeBranchStatus implements IRunFileStreamStatus, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public enum Stage {
+        UNKNOWN,
+        OPENED,
+        DATA_PROCESSING,
+        JOIN_PROCESSING,
+        CLOSED;
+
+        public boolean isEqualOrBefore(Stage bs) {
+            return this.ordinal() <= bs.ordinal();
+        }
+    }
+
+    private boolean hasMore = true;
+
+    private Stage stage = Stage.UNKNOWN;
+
+    private boolean runFileWriting = false;
+    private boolean runFileReading = false;
+
+    public Stage getStatus() {
+        return stage;
+    }
+
+    public void setStageOpen() {
+        stage = Stage.OPENED;
+    }
+
+    public void setStageData() {
+        stage = Stage.DATA_PROCESSING;
+    }
+
+    public void setStageJoin() {
+        stage = Stage.JOIN_PROCESSING;
+    }
+
+    public void setStageClose() {
+        stage = Stage.CLOSED;
+    }
+
+    public boolean hasMore() {
+        return hasMore;
+    }
+
+    public void noMore() {
+        this.hasMore = false;
+    }
+
+    @Override
+    public boolean isRunFileWriting() {
+        return runFileWriting;
+    }
+
+    @Override
+    public void setRunFileWriting(boolean runFileWriting) {
+        this.runFileWriting = runFileWriting;
+    }
+
+    @Override
+    public boolean isRunFileReading() {
+        return runFileReading;
+    }
+
+    @Override
+    public void setRunFileReading(boolean runFileReading) {
+        this.runFileReading = runFileReading;
+    }
+
+    @Override
+    public String toString() {
+        return "Branch status is " + stage + ": the stream " + (hasMore ? "has 
more" : "is empty")
+                + " and the run file is " + (runFileWriting ? "WRITING " : "") 
+ (runFileReading ? "READING " : "");
+
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinLocks.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinLocks.java
new file mode 100644
index 0000000..692ca98
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinLocks.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join.mergejoin;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MergeJoinLocks implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final List<Lock> lock = new ArrayList<>();
+    private final List<Condition> left = new ArrayList<>();
+    private final List<Condition> right = new ArrayList<>();
+
+    public synchronized void setPartitions(int partitions) {
+        for (int i = lock.size(); i < partitions; ++i) {
+            lock.add(new ReentrantLock());
+            left.add(lock.get(i).newCondition());
+            right.add(lock.get(i).newCondition());
+        }
+    }
+
+    public Lock getLock(int partition) {
+        return lock.get(partition);
+    }
+
+    public Condition getLeft(int partition) {
+        return left.get(partition);
+    }
+
+    public Condition getRight(int partition) {
+        return right.get(partition);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinOperatorDescriptor.java
new file mode 100644
index 0000000..f5a808c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinOperatorDescriptor.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join.mergejoin;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.join.mergejoin.MergeBranchStatus.Stage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The merge join is made up of two operators: left and right.
+ * The right operator loads right stream into memory for the merge process.
+ * The left operator streams the left input and the right memory store to 
merge and join the data.
+ */
+public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private static final int LEFT_ACTIVITY_ID = 0;
+    private static final int RIGHT_ACTIVITY_ID = 1;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final int memoryForJoin;
+    private final IMergeJoinCheckerFactory mergeJoinCheckerFactory;
+
+    public MergeJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
memoryForJoin,
+            RecordDescriptor recordDescriptor, int[] leftKeys, int[] rightKeys,
+            IMergeJoinCheckerFactory mergeJoinCheckerFactory) {
+        super(spec, 2, 1);
+        recordDescriptors[0] = recordDescriptor;
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.memoryForJoin = memoryForJoin;
+        this.mergeJoinCheckerFactory = mergeJoinCheckerFactory;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        MergeJoinLocks locks = new MergeJoinLocks();
+
+        ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID);
+        ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID);
+
+        IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, 
locks);
+        IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, 
locks);
+
+        builder.addActivity(this, rightAN);
+        builder.addSourceEdge(1, rightAN, 0);
+
+        builder.addActivity(this, leftAN);
+        builder.addSourceEdge(0, leftAN, 0);
+        builder.addTargetEdge(0, leftAN, 0);
+    }
+
+    private class LeftJoinerActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private final MergeJoinLocks locks;
+
+        public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, 
MergeJoinLocks locks) {
+            super(id);
+            this.locks = locks;
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, 
int nPartitions)
+                throws HyracksDataException {
+            locks.setPartitions(nPartitions);
+            final RecordDescriptor inRecordDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            return new LeftJoinerOperator(ctx, partition, inRecordDesc);
+        }
+
+        private class LeftJoinerOperator extends 
AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+            private final IHyracksTaskContext ctx;
+            private final int partition;
+            private final RecordDescriptor leftRd;
+            private MergeJoinTaskState state;
+            private boolean first = true;
+            int count = 0;
+
+            public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, 
RecordDescriptor inRecordDesc) {
+                this.ctx = ctx;
+                this.partition = partition;
+                this.leftRd = inRecordDesc;
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    writer.open();
+                    state = new 
MergeJoinTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
+                    state.leftRd = leftRd;
+                    ctx.setStateObject(state);
+                    locks.getRight(partition).signal();
+
+                    do {
+                        // Continue after joiner created in right branch.
+                        if (state.joiner == null) {
+                            locks.getLeft(partition).await();
+                        }
+                    } while (state.joiner == null);
+                    state.status.branch[LEFT_ACTIVITY_ID].setStageOpen();
+                    locks.getRight(partition).signal();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                locks.getLock(partition).lock();
+
+                count++;
+                if (first) {
+                    state.status.branch[LEFT_ACTIVITY_ID].setStageData();
+                    first = false;
+                }
+                try {
+                    state.joiner.setFrame(LEFT_ACTIVITY_ID, buffer);
+                    state.joiner.processLeftFrame(writer);
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.failed = true;
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.status.branch[LEFT_ACTIVITY_ID].noMore();
+                    if (state.failed) {
+                        writer.fail();
+                    } else {
+                        state.joiner.processLeftClose(writer);
+                        writer.close();
+                    }
+                    state.status.branch[LEFT_ACTIVITY_ID].setStageClose();
+                    locks.getRight(partition).signal();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+//                System.err.println("Left next calls: " + count);
+            }
+        }
+    }
+
+    private class RightDataActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private final ActivityId joinAid;
+        private final MergeJoinLocks locks;
+
+        public RightDataActivityNode(ActivityId id, ActivityId joinAid, 
MergeJoinLocks locks) {
+            super(id);
+            this.joinAid = joinAid;
+            this.locks = locks;
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, 
int nPartitions)
+                throws HyracksDataException {
+            locks.setPartitions(nPartitions);
+            RecordDescriptor inRecordDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            final IMergeJoinChecker mjc = 
mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys, ctx);
+            return new RightDataOperator(ctx, partition, inRecordDesc, mjc);
+        }
+
+        private class RightDataOperator extends 
AbstractUnaryInputSinkOperatorNodePushable {
+
+            private int partition;
+            private IHyracksTaskContext ctx;
+            private final RecordDescriptor rightRd;
+            private final IMergeJoinChecker mjc;
+            private MergeJoinTaskState state;
+            private boolean first = true;
+            int count = 0;
+
+            public RightDataOperator(IHyracksTaskContext ctx, int partition, 
RecordDescriptor inRecordDesc,
+                    IMergeJoinChecker mjc) {
+                this.ctx = ctx;
+                this.partition = partition;
+                this.rightRd = inRecordDesc;
+                this.mjc = mjc;
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    do {
+                        // Wait for the state to be set in the context form 
Left.
+                        state = (MergeJoinTaskState) ctx.getStateObject(new 
TaskId(joinAid, partition));
+                        if (state == null) {
+                            locks.getRight(partition).await();
+                        }
+                    } while (state == null);
+                    state.rightRd = rightRd;
+                    state.joiner = new MergeJoiner(ctx, memoryForJoin, 
partition, state.status, locks, mjc,
+                            state.leftRd, state.rightRd);
+                    state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen();
+                    locks.getLeft(partition).signal();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                locks.getLock(partition).lock();
+                count++;
+                if (first) {
+                    state.status.branch[RIGHT_ACTIVITY_ID].setStageData();
+                    first = false;
+                }
+                try {
+                    while (!state.status.continueRightLoad
+                            && 
state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
+                        // Wait for the state to request right frame unless 
left has finished.
+                        locks.getRight(partition).await();
+                    }
+                    state.joiner.setFrame(RIGHT_ACTIVITY_ID, buffer);
+                    state.status.continueRightLoad = false;
+                    locks.getLeft(partition).signal();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.failed = true;
+                    locks.getLeft(partition).signal();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.status.branch[RIGHT_ACTIVITY_ID].setStageClose();
+                    locks.getLeft(partition).signal();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+//                System.err.println("Right next calls: " + count);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinTaskState.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinTaskState.java
new file mode 100644
index 0000000..6582c89
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/mergejoin/MergeJoinTaskState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join.mergejoin;
+
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class MergeJoinTaskState extends AbstractStateObject {
+    public MergeStatus status;
+    public IMergeJoiner joiner;
+    public boolean failed;
+    public RecordDescriptor leftRd;
+    public RecordDescriptor rightRd;
+
+    public MergeJoinTaskState(JobId jobId, TaskId taskId) {
+        super(jobId, taskId);
+        status = new MergeStatus();
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/3453
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ica175b89b901fede3e539a9b0af7feee430304eb
Gerrit-Change-Number: 3453
Gerrit-PatchSet: 1
Gerrit-Owner: Stephen Ermshar <[email protected]>

Reply via email to