>From Janhavi Tripurwar <janhavi.tripur...@couchbase.com>:

Janhavi Tripurwar has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20034 )


Change subject: wip: merge join
......................................................................

wip: merge join

Change-Id: I29b62ad57b453f32c6390e3a1a98c407dff70680
---
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoinOperatorDescriptor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
A 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeJoinPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoin.java
6 files changed, 554 insertions(+), 2 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/34/20034/1

diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index c052d58..fc9cdf9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -84,5 +84,6 @@
     WRITE_RESULT,
     INTERSECT,
     WINDOW,
-    WINDOW_STREAM
+    WINDOW_STREAM,
+    SORT_MERGE_JOIN
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeJoinPOperator.java
new file mode 100644
index 0000000..747f30c
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeJoinPOperator.java
@@ -0,0 +1,132 @@
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.evaluators.TuplePairEvaluatorFactory;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.join.SortMergeJoinOperatorDescriptor;
+
+public class SortMergeJoinPOperator extends AbstractJoinPOperator {
+    public SortMergeJoinPOperator(AbstractBinaryJoinOperator.JoinKind kind, 
JoinPartitioningType partitioningType) {
+        super(kind, partitioningType);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SORT_MERGE_JOIN;
+    }
+
+    @Override
+    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) throws AlgebricksException {
+        if (partitioningType != JoinPartitioningType.BROADCAST) {
+            throw new NotImplementedException(partitioningType + " sort merge 
joins are not implemented.");
+        }
+
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
+
+        // TODO: leverage statistics to make better decisions.
+        pv[0] = 
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new 
StructuralPropertiesVector(
+                new 
RandomPartitioningProperty(context.getComputationNodeDomain()), null));
+        pv[1] = 
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new 
StructuralPropertiesVector(
+                new 
BroadcastPartitioningProperty(context.getComputationNodeDomain()), null));
+        return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator iop, 
IOptimizationContext context)
+            throws AlgebricksException {
+        if (partitioningType != JoinPartitioningType.BROADCAST) {
+            throw new NotImplementedException(partitioningType + " nested loop 
joins are not implemented.");
+        }
+
+        IPartitioningProperty pp;
+        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+        //copied for now
+        if (op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+            AbstractLogicalOperator leftOp = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
+            IPhysicalPropertiesVector leftOpProperties = 
leftOp.getPhysicalOperator().getDeliveredProperties();
+            if (leftOpProperties == null) {
+                pp = null;
+            } else {
+                pp = leftOpProperties.getPartitioningProperty();
+            }
+        } else {
+            pp = IPartitioningProperty.UNPARTITIONED;
+        }
+
+        this.deliveredProperties = new StructuralPropertiesVector(pp, null);
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
+        RecordDescriptor recDescriptor =
+                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), 
propagatedSchema, context);
+        IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
+        conditionInputSchemas[0] = propagatedSchema;
+        IExpressionRuntimeProvider expressionRuntimeProvider = 
context.getExpressionRuntimeProvider();
+        IScalarEvaluatorFactory cond = 
expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
+                context.getTypeEnvironment(op), conditionInputSchemas, 
context);
+        ITuplePairComparatorFactory comparatorFactory =
+                new TuplePairEvaluatorFactory(cond, false, 
context.getBinaryBooleanInspectorFactory());
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        IOperatorDescriptor opDesc;
+
+        int memSize = localMemoryRequirements.getMemoryBudgetInFrames();
+        switch (kind) {
+            case INNER:
+                opDesc = new SortMergeJoinOperatorDescriptor(spec, 
comparatorFactory, recDescriptor, memSize, false,
+                        null);
+                break;
+            case LEFT_OUTER:
+                IMissingWriterFactory[] nonMatchWriterFactories = 
JobGenHelper.createMissingWriterFactories(context,
+                        ((LeftOuterJoinOperator) join).getMissingValue(), 
inputSchemas[1].getSize());
+                opDesc = new SortMergeJoinOperatorDescriptor(spec, 
comparatorFactory, recDescriptor, memSize, true,
+                        nonMatchWriterFactories);
+                break;
+            default:
+                throw new NotImplementedException();
+        }
+
+        opDesc.setSourceLocation(join.getSourceLocation());
+        contributeOpDesc(builder, join, opDesc);
+
+        ILogicalOperator src1 = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src1, 0, op, 0);
+        ILogicalOperator src2 = op.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(src2, 0, op, 1);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 085580f..e65e8c2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -45,6 +45,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.HybridHashJoinPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashJoinPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeJoinPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
@@ -123,7 +124,8 @@
             }
         } else {
             warnIfCrossProduct(conditionExpr, op.getSourceLocation(), context);
-            setNestedLoopJoinOp(op);
+            //                        setNestedLoopJoinOp(op);
+            setSortMergeJoinOp(op);
         }
     }

@@ -131,6 +133,10 @@
         op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), 
JoinPartitioningType.BROADCAST));
     }

+    private static void setSortMergeJoinOp(AbstractBinaryJoinOperator op) {
+        op.setPhysicalOperator(new SortMergeJoinPOperator(op.getJoinKind(), 
JoinPartitioningType.BROADCAST));
+    }
+
     private static void setHashJoinOp(AbstractBinaryJoinOperator op, 
JoinPartitioningType partitioningType,
             List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, 
IOptimizationContext context) {
         op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(), 
partitioningType, sideLeft, sideRight,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 831753f..e9a4b83 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -105,6 +105,7 @@
                 IRecordDescriptorProvider recordDescProvider, final int 
partition, int nPartitions)
                 throws HyracksDataException {
             final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
+            //the following two lines takes schema of both the input streams/ 
datasets
             final RecordDescriptor rd0 = 
recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);

diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoin.java
new file mode 100644
index 0000000..884a929
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoin.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriter;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
+import 
org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
+import 
org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
+
+public class SortMergeJoin {
+
+    private static final int MIN_FRAME_BUDGET_INNER_JOIN = 3;
+    private static final int MIN_FRAME_BUDGET_OUTER_JOIN = 
MIN_FRAME_BUDGET_INNER_JOIN + 1;
+
+    private final FrameTupleAccessor accessorLeft;
+    private final FrameTupleAccessor accessorRight;
+    private final FrameTupleAppender appender;
+    private ITuplePairComparator tpComparator;
+    private final IFrame outBuffer;
+    private final IFrame innerBuffer;
+
+    private final VariableFrameMemoryManager leftBufferMngr;
+    private final VariableFrameMemoryManager rightBufferMngr;
+
+    public SortMergeJoin(IHyracksJobletContext jobletContext, 
FrameTupleAccessor accessorLeft,
+            FrameTupleAccessor accessorRight, int memBudgetInFrames, boolean 
isLeftOuter,
+            IMissingWriter[] missingWriters) throws HyracksDataException {
+        this(jobletContext, accessorLeft, accessorRight, memBudgetInFrames, 
isLeftOuter, missingWriters, false);
+    }
+
+    public SortMergeJoin(IHyracksJobletContext jobletContext, 
FrameTupleAccessor accessorLeft,
+            FrameTupleAccessor accessorRight, int memBudgetInFrames, boolean 
isLeftOuter,
+            IMissingWriter[] missingWriters, boolean isRightOuter) throws 
HyracksDataException {
+
+        this.accessorLeft = accessorLeft; //rd1
+        this.accessorRight = accessorRight; //rd0
+        this.appender = new FrameTupleAppender();
+        this.outBuffer = new VSizeFrame(jobletContext);
+        this.innerBuffer = new VSizeFrame(jobletContext);
+        this.appender.reset(outBuffer, true);
+
+        int minMemBudgetInFrames = isLeftOuter ? MIN_FRAME_BUDGET_OUTER_JOIN : 
MIN_FRAME_BUDGET_INNER_JOIN;
+        if (memBudgetInFrames < minMemBudgetInFrames) {
+            throw new HyracksDataException(ErrorCode.INSUFFICIENT_MEMORY);
+        }
+
+        int outerBufferMngrMemBudgetInFrames = memBudgetInFrames - 
minMemBudgetInFrames + 1;
+        int outerBufferMngrMemBudgetInBytes = 
jobletContext.getInitialFrameSize() * outerBufferMngrMemBudgetInFrames;
+        this.rightBufferMngr = new VariableFrameMemoryManager(
+                new VariableFramePool(jobletContext, 
outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory
+                        .createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, 
outerBufferMngrMemBudgetInFrames));
+
+        this.leftBufferMngr = new VariableFrameMemoryManager(
+                new VariableFramePool(jobletContext, 
outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory
+                        .createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, 
outerBufferMngrMemBudgetInFrames));
+
+    }
+
+    public void mergeTuples(IFrameWriter writer) throws HyracksDataException {
+        int leftCount = accessorLeft.getTupleCount();
+        int rightCount = accessorRight.getTupleCount();
+        int leftIndex = 0, rightIndex = 0;
+        int mark = -1;
+
+        while (true) {
+            if (mark < 0) {
+                if (leftIndex >= leftCount || rightIndex >= rightCount) {
+                    break;
+                }
+                while (leftIndex < leftCount && rightIndex < rightCount && 
compareTuples(leftIndex, rightIndex) < 0) {
+                    leftIndex++;
+                }
+                while (leftIndex < leftCount && rightIndex < rightCount && 
compareTuples(leftIndex, rightIndex) > 0) {
+                    rightIndex++;
+                }
+                mark = rightIndex;
+            }
+
+            if (leftIndex < leftCount && rightIndex < rightCount && 
compareTuples(leftIndex, rightIndex) == 0) {
+                //join condition is satisfied
+                appendToResults(leftIndex, rightIndex, writer);
+                rightIndex++;
+            } else {
+                rightIndex = mark;
+                leftIndex++;
+                mark = -1;
+            }
+        }
+    }
+
+    private int compareTuples(int leftIndex, int rightIndex) throws 
HyracksDataException {
+        int compareResult = tpComparator.compare(accessorLeft, leftIndex, 
accessorRight, rightIndex);
+        // Return 0 when join condition is satisfied
+        return compareResult;
+
+    }
+
+    public void mergeWithLeftFrame(ByteBuffer rightBuffer, IFrameWriter 
writer) throws HyracksDataException {
+        accessorLeft.reset(rightBuffer);
+        if (accessorLeft.getTupleCount() <= 0) {
+            return;
+        }
+        mergeTuples(writer);
+    }
+
+    private void appendToResults(int leftTupleId, int rightTupleId, 
IFrameWriter writer) throws HyracksDataException {
+        FrameUtils.appendConcatToWriter(writer, appender, accessorLeft, 
leftTupleId, accessorRight, rightTupleId);
+    }
+
+    /**
+     * Collect left frames using buffer manager
+     */
+    public void collectLeftFrame(ByteBuffer buffer) throws 
HyracksDataException {
+        accessorLeft.reset(buffer);
+        if (accessorLeft.getTupleCount() > 0) {
+            // Insert frame into buffer manager
+            if (leftBufferMngr.insertFrame(buffer) < 0) {
+                throw new HyracksDataException("Cannot insert left frame - 
buffer full");
+            }
+        }
+    }
+
+    public void collectRightFrame(ByteBuffer buffer) throws 
HyracksDataException {
+        accessorRight.reset(buffer);
+        if (accessorRight.getTupleCount() > 0) {
+            // Insert frame into buffer manager
+            if (rightBufferMngr.insertFrame(buffer) < 0) {
+                throw new HyracksDataException("Cannot insert right frame - 
buffer full");
+            }
+        }
+    }
+
+    /**
+     * Must be called before starting to join to set the right comparator with 
the right context.
+     *
+     * @param comparator the comparator to use for comparing the probe tuples 
against the build tuples
+     */
+    void setComparator(ITuplePairComparator comparator) {
+        tpComparator = comparator;
+    }
+
+    public void completeJoin(IFrameWriter writer) throws HyracksDataException {
+        appender.write(writer, true);
+    }
+
+    public void releaseMemory() throws HyracksDataException {
+        leftBufferMngr.reset();
+        rightBufferMngr.reset();
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoinOperatorDescriptor.java
new file mode 100644
index 0000000..f8180f1
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoinOperatorDescriptor.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IMissingWriter;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class SortMergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
+    private static final int BUFFER_ACTIVITY_ID = 0;
+    private static final int MERGE_ACTIVITY_ID = 1;
+    private static final long serialVersionUID = 1L;
+
+    private final ITuplePairComparatorFactory comparatorFactory;
+    private final int memSize;
+    private final boolean isLeftOuter;
+    private final IMissingWriterFactory[] nullWriterFactories1;
+
+    public SortMergeJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            ITuplePairComparatorFactory comparatorFactory, RecordDescriptor 
recordDescriptor, int memSize,
+            boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) 
{
+        super(spec, 2, 1);
+        this.comparatorFactory = comparatorFactory;
+        this.outRecDescs[0] = recordDescriptor;
+        this.memSize = memSize;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        // consider both datasets are already sorted for now
+        ActivityId bufferAid = new ActivityId(getOperatorId(), 
BUFFER_ACTIVITY_ID);
+        ActivityId mergeAid = new ActivityId(getOperatorId(), 
MERGE_ACTIVITY_ID);
+        SortActivityNode bufferNode = new SortActivityNode(bufferAid, 
mergeAid);
+        MergeJoinActivityNode mergeJoinActivityNode = new 
MergeJoinActivityNode(mergeAid);
+
+        builder.addActivity(this, bufferNode);
+        builder.addSourceEdge(1, bufferNode, 0);
+
+        builder.addActivity(this, mergeJoinActivityNode);
+        builder.addSourceEdge(0, mergeJoinActivityNode, 0);
+
+        builder.addTargetEdge(0, mergeJoinActivityNode, 0);
+        builder.addBlockingEdge(bufferNode, mergeJoinActivityNode);
+    }
+
+    public static class SortMergeJoinTaskState extends AbstractStateObject {
+        private SortMergeJoin joiner;
+
+        private SortMergeJoinTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+    }
+
+    private class SortActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private final ActivityId mergeAid;
+
+        public SortActivityNode(ActivityId id, ActivityId mergeAid) {
+            super(id);
+            this.mergeAid = mergeAid;
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, 
int nPartitions)
+                throws HyracksDataException {
+            final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
+            final RecordDescriptor rd0 = 
recordDescProvider.getInputRecordDescriptor(mergeAid, 0);
+            final RecordDescriptor rd1 = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+
+            final IMissingWriter[] nullWriters1 = isLeftOuter ? new 
IMissingWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = 
nullWriterFactories1[i].createMissingWriter();
+                }
+            }
+
+            return new AbstractUnaryInputSinkOperatorNodePushable() {
+                private SortMergeJoinTaskState state;
+                private VSizeFrame inFrame;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = new SortMergeJoinTaskState(jobletCtx.getJobId(), 
new TaskId(getActivityId(), partition));
+                    state.joiner = new SortMergeJoin(jobletCtx, new 
FrameTupleAccessor(rd0),
+                            new FrameTupleAccessor(rd1), memSize, isLeftOuter, 
nullWriters1);
+                    inFrame = new VSizeFrame(ctx);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                    inFrame.resize(buffer.capacity());
+                    ByteBuffer copyBuffer = inFrame.getBuffer();
+                    FrameUtils.copyAndFlip(buffer, copyBuffer);
+                    state.joiner.collectRightFrame(copyBuffer); // ← Just 
collect, don't cache to disk
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    // state and state.joiner can be null if open() fails
+                    if (state != null && state.joiner != null) {
+                        ctx.setStateObject(state);
+                    }
+                }
+            };
+        }
+    }
+
+    private class MergeJoinActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public MergeJoinActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, 
int nPartitions)
+                throws HyracksDataException {
+            return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private SortMergeJoinTaskState state;
+                boolean failed = false;
+
+                @Override
+                public void open() throws HyracksDataException {
+
+                    state = (SortMergeJoinTaskState) ctx
+                            .getStateObject(new TaskId(new 
ActivityId(getOperatorId(), BUFFER_ACTIVITY_ID), partition));
+                    
state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx));
+                    writer.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                    state.joiner.mergeWithLeftFrame(buffer, writer);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    failed = true;
+                    writer.fail();
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    if (failed) {
+                        closeOnFail();
+                        return;
+                    }
+                    try {
+                        try {
+                            state.joiner.completeJoin(writer);
+                        } finally {
+                            state.joiner.releaseMemory();
+                        }
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw e;
+                    } finally {
+                        writer.close();
+                    }
+                }
+
+                private void closeOnFail() {
+                    try {
+                        writer.close();
+                    } catch (Exception e) {
+                        //ignore
+                    }
+                }
+            };
+        }
+
+        @Override
+        public String getDisplayName() {
+            return super.getDisplayName();
+        }
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20034
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I29b62ad57b453f32c6390e3a1a98c407dff70680
Gerrit-Change-Number: 20034
Gerrit-PatchSet: 1
Gerrit-Owner: Janhavi Tripurwar <janhavi.tripur...@couchbase.com>
Gerrit-MessageType: newchange

Reply via email to