>From Janhavi Tripurwar <janhavi.tripur...@couchbase.com>: Janhavi Tripurwar has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20034 )
Change subject: wip: merge join ...................................................................... wip: merge join Change-Id: I29b62ad57b453f32c6390e3a1a98c407dff70680 --- A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoinOperatorDescriptor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeJoinPOperator.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoin.java 6 files changed, 554 insertions(+), 2 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/34/20034/1 diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index c052d58..fc9cdf9 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -84,5 +84,6 @@ WRITE_RESULT, INTERSECT, WINDOW, - WINDOW_STREAM + WINDOW_STREAM, + SORT_MERGE_JOIN } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeJoinPOperator.java new file mode 100644 index 0000000..747f30c --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeJoinPOperator.java @@ -0,0 +1,132 @@ +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.evaluators.TuplePairEvaluatorFactory; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.join.SortMergeJoinOperatorDescriptor; + +public class SortMergeJoinPOperator extends AbstractJoinPOperator { + public SortMergeJoinPOperator(AbstractBinaryJoinOperator.JoinKind kind, JoinPartitioningType partitioningType) { + super(kind, partitioningType); + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.SORT_MERGE_JOIN; + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException { + if (partitioningType != JoinPartitioningType.BROADCAST) { + throw new NotImplementedException(partitioningType + " sort merge joins are not implemented."); + } + + StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2]; + + // TODO: leverage statistics to make better decisions. + pv[0] = OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( + new RandomPartitioningProperty(context.getComputationNodeDomain()), null)); + pv[1] = OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( + new BroadcastPartitioningProperty(context.getComputationNodeDomain()), null)); + return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @Override + public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) + throws AlgebricksException { + if (partitioningType != JoinPartitioningType.BROADCAST) { + throw new NotImplementedException(partitioningType + " nested loop joins are not implemented."); + } + + IPartitioningProperty pp; + AbstractLogicalOperator op = (AbstractLogicalOperator) iop; + + //copied for now + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { + AbstractLogicalOperator leftOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + IPhysicalPropertiesVector leftOpProperties = leftOp.getPhysicalOperator().getDeliveredProperties(); + if (leftOpProperties == null) { + pp = null; + } else { + pp = leftOpProperties.getPartitioningProperty(); + } + } else { + pp = IPartitioningProperty.UNPARTITIONED; + } + + this.deliveredProperties = new StructuralPropertiesVector(pp, null); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op; + RecordDescriptor recDescriptor = + JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context); + IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1]; + conditionInputSchemas[0] = propagatedSchema; + IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); + IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(), + context.getTypeEnvironment(op), conditionInputSchemas, context); + ITuplePairComparatorFactory comparatorFactory = + new TuplePairEvaluatorFactory(cond, false, context.getBinaryBooleanInspectorFactory()); + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + IOperatorDescriptor opDesc; + + int memSize = localMemoryRequirements.getMemoryBudgetInFrames(); + switch (kind) { + case INNER: + opDesc = new SortMergeJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false, + null); + break; + case LEFT_OUTER: + IMissingWriterFactory[] nonMatchWriterFactories = JobGenHelper.createMissingWriterFactories(context, + ((LeftOuterJoinOperator) join).getMissingValue(), inputSchemas[1].getSize()); + opDesc = new SortMergeJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true, + nonMatchWriterFactories); + break; + default: + throw new NotImplementedException(); + } + + opDesc.setSourceLocation(join.getSourceLocation()); + contributeOpDesc(builder, join, opDesc); + + ILogicalOperator src1 = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src1, 0, op, 0); + ILogicalOperator src2 = op.getInputs().get(1).getValue(); + builder.contributeGraphEdge(src2, 0, op, 1); + } + + @Override + public boolean isMicroOperator() { + return false; + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java index 085580f..e65e8c2 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java @@ -45,6 +45,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.HybridHashJoinPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashJoinPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeJoinPOperator; import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; @@ -123,7 +124,8 @@ } } else { warnIfCrossProduct(conditionExpr, op.getSourceLocation(), context); - setNestedLoopJoinOp(op); + // setNestedLoopJoinOp(op); + setSortMergeJoinOp(op); } } @@ -131,6 +133,10 @@ op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST)); } + private static void setSortMergeJoinOp(AbstractBinaryJoinOperator op) { + op.setPhysicalOperator(new SortMergeJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST)); + } + private static void setHashJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType, List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context) { op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(), partitioningType, sideLeft, sideRight, diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java index 831753f..e9a4b83 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java @@ -105,6 +105,7 @@ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) throws HyracksDataException { final IHyracksJobletContext jobletCtx = ctx.getJobletContext(); + //the following two lines takes schema of both the input streams/ datasets final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0); final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoin.java new file mode 100644 index 0000000..884a929 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoin.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksJobletContext; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy; +import org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory; +import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager; +import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool; + +public class SortMergeJoin { + + private static final int MIN_FRAME_BUDGET_INNER_JOIN = 3; + private static final int MIN_FRAME_BUDGET_OUTER_JOIN = MIN_FRAME_BUDGET_INNER_JOIN + 1; + + private final FrameTupleAccessor accessorLeft; + private final FrameTupleAccessor accessorRight; + private final FrameTupleAppender appender; + private ITuplePairComparator tpComparator; + private final IFrame outBuffer; + private final IFrame innerBuffer; + + private final VariableFrameMemoryManager leftBufferMngr; + private final VariableFrameMemoryManager rightBufferMngr; + + public SortMergeJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorLeft, + FrameTupleAccessor accessorRight, int memBudgetInFrames, boolean isLeftOuter, + IMissingWriter[] missingWriters) throws HyracksDataException { + this(jobletContext, accessorLeft, accessorRight, memBudgetInFrames, isLeftOuter, missingWriters, false); + } + + public SortMergeJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorLeft, + FrameTupleAccessor accessorRight, int memBudgetInFrames, boolean isLeftOuter, + IMissingWriter[] missingWriters, boolean isRightOuter) throws HyracksDataException { + + this.accessorLeft = accessorLeft; //rd1 + this.accessorRight = accessorRight; //rd0 + this.appender = new FrameTupleAppender(); + this.outBuffer = new VSizeFrame(jobletContext); + this.innerBuffer = new VSizeFrame(jobletContext); + this.appender.reset(outBuffer, true); + + int minMemBudgetInFrames = isLeftOuter ? MIN_FRAME_BUDGET_OUTER_JOIN : MIN_FRAME_BUDGET_INNER_JOIN; + if (memBudgetInFrames < minMemBudgetInFrames) { + throw new HyracksDataException(ErrorCode.INSUFFICIENT_MEMORY); + } + + int outerBufferMngrMemBudgetInFrames = memBudgetInFrames - minMemBudgetInFrames + 1; + int outerBufferMngrMemBudgetInBytes = jobletContext.getInitialFrameSize() * outerBufferMngrMemBudgetInFrames; + this.rightBufferMngr = new VariableFrameMemoryManager( + new VariableFramePool(jobletContext, outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory + .createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, outerBufferMngrMemBudgetInFrames)); + + this.leftBufferMngr = new VariableFrameMemoryManager( + new VariableFramePool(jobletContext, outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory + .createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, outerBufferMngrMemBudgetInFrames)); + + } + + public void mergeTuples(IFrameWriter writer) throws HyracksDataException { + int leftCount = accessorLeft.getTupleCount(); + int rightCount = accessorRight.getTupleCount(); + int leftIndex = 0, rightIndex = 0; + int mark = -1; + + while (true) { + if (mark < 0) { + if (leftIndex >= leftCount || rightIndex >= rightCount) { + break; + } + while (leftIndex < leftCount && rightIndex < rightCount && compareTuples(leftIndex, rightIndex) < 0) { + leftIndex++; + } + while (leftIndex < leftCount && rightIndex < rightCount && compareTuples(leftIndex, rightIndex) > 0) { + rightIndex++; + } + mark = rightIndex; + } + + if (leftIndex < leftCount && rightIndex < rightCount && compareTuples(leftIndex, rightIndex) == 0) { + //join condition is satisfied + appendToResults(leftIndex, rightIndex, writer); + rightIndex++; + } else { + rightIndex = mark; + leftIndex++; + mark = -1; + } + } + } + + private int compareTuples(int leftIndex, int rightIndex) throws HyracksDataException { + int compareResult = tpComparator.compare(accessorLeft, leftIndex, accessorRight, rightIndex); + // Return 0 when join condition is satisfied + return compareResult; + + } + + public void mergeWithLeftFrame(ByteBuffer rightBuffer, IFrameWriter writer) throws HyracksDataException { + accessorLeft.reset(rightBuffer); + if (accessorLeft.getTupleCount() <= 0) { + return; + } + mergeTuples(writer); + } + + private void appendToResults(int leftTupleId, int rightTupleId, IFrameWriter writer) throws HyracksDataException { + FrameUtils.appendConcatToWriter(writer, appender, accessorLeft, leftTupleId, accessorRight, rightTupleId); + } + + /** + * Collect left frames using buffer manager + */ + public void collectLeftFrame(ByteBuffer buffer) throws HyracksDataException { + accessorLeft.reset(buffer); + if (accessorLeft.getTupleCount() > 0) { + // Insert frame into buffer manager + if (leftBufferMngr.insertFrame(buffer) < 0) { + throw new HyracksDataException("Cannot insert left frame - buffer full"); + } + } + } + + public void collectRightFrame(ByteBuffer buffer) throws HyracksDataException { + accessorRight.reset(buffer); + if (accessorRight.getTupleCount() > 0) { + // Insert frame into buffer manager + if (rightBufferMngr.insertFrame(buffer) < 0) { + throw new HyracksDataException("Cannot insert right frame - buffer full"); + } + } + } + + /** + * Must be called before starting to join to set the right comparator with the right context. + * + * @param comparator the comparator to use for comparing the probe tuples against the build tuples + */ + void setComparator(ITuplePairComparator comparator) { + tpComparator = comparator; + } + + public void completeJoin(IFrameWriter writer) throws HyracksDataException { + appender.write(writer, true); + } + + public void releaseMemory() throws HyracksDataException { + leftBufferMngr.reset(); + rightBufferMngr.reset(); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoinOperatorDescriptor.java new file mode 100644 index 0000000..f8180f1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/SortMergeJoinOperatorDescriptor.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksJobletContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.TaskId; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractStateObject; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; + +public class SortMergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { + private static final int BUFFER_ACTIVITY_ID = 0; + private static final int MERGE_ACTIVITY_ID = 1; + private static final long serialVersionUID = 1L; + + private final ITuplePairComparatorFactory comparatorFactory; + private final int memSize; + private final boolean isLeftOuter; + private final IMissingWriterFactory[] nullWriterFactories1; + + public SortMergeJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, + ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize, + boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) { + super(spec, 2, 1); + this.comparatorFactory = comparatorFactory; + this.outRecDescs[0] = recordDescriptor; + this.memSize = memSize; + this.isLeftOuter = isLeftOuter; + this.nullWriterFactories1 = nullWriterFactories1; + } + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + // consider both datasets are already sorted for now + ActivityId bufferAid = new ActivityId(getOperatorId(), BUFFER_ACTIVITY_ID); + ActivityId mergeAid = new ActivityId(getOperatorId(), MERGE_ACTIVITY_ID); + SortActivityNode bufferNode = new SortActivityNode(bufferAid, mergeAid); + MergeJoinActivityNode mergeJoinActivityNode = new MergeJoinActivityNode(mergeAid); + + builder.addActivity(this, bufferNode); + builder.addSourceEdge(1, bufferNode, 0); + + builder.addActivity(this, mergeJoinActivityNode); + builder.addSourceEdge(0, mergeJoinActivityNode, 0); + + builder.addTargetEdge(0, mergeJoinActivityNode, 0); + builder.addBlockingEdge(bufferNode, mergeJoinActivityNode); + } + + public static class SortMergeJoinTaskState extends AbstractStateObject { + private SortMergeJoin joiner; + + private SortMergeJoinTaskState(JobId jobId, TaskId taskId) { + super(jobId, taskId); + } + } + + private class SortActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + private final ActivityId mergeAid; + + public SortActivityNode(ActivityId id, ActivityId mergeAid) { + super(id); + this.mergeAid = mergeAid; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + final IHyracksJobletContext jobletCtx = ctx.getJobletContext(); + final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(mergeAid, 0); + final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + + final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null; + if (isLeftOuter) { + for (int i = 0; i < nullWriterFactories1.length; i++) { + nullWriters1[i] = nullWriterFactories1[i].createMissingWriter(); + } + } + + return new AbstractUnaryInputSinkOperatorNodePushable() { + private SortMergeJoinTaskState state; + private VSizeFrame inFrame; + + @Override + public void open() throws HyracksDataException { + state = new SortMergeJoinTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition)); + state.joiner = new SortMergeJoin(jobletCtx, new FrameTupleAccessor(rd0), + new FrameTupleAccessor(rd1), memSize, isLeftOuter, nullWriters1); + inFrame = new VSizeFrame(ctx); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + inFrame.resize(buffer.capacity()); + ByteBuffer copyBuffer = inFrame.getBuffer(); + FrameUtils.copyAndFlip(buffer, copyBuffer); + state.joiner.collectRightFrame(copyBuffer); // ← Just collect, don't cache to disk + } + + @Override + public void fail() throws HyracksDataException { + + } + + @Override + public void close() throws HyracksDataException { + // state and state.joiner can be null if open() fails + if (state != null && state.joiner != null) { + ctx.setStateObject(state); + } + } + }; + } + } + + private class MergeJoinActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + public MergeJoinActivityNode(ActivityId id) { + super(id); + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { + private SortMergeJoinTaskState state; + boolean failed = false; + + @Override + public void open() throws HyracksDataException { + + state = (SortMergeJoinTaskState) ctx + .getStateObject(new TaskId(new ActivityId(getOperatorId(), BUFFER_ACTIVITY_ID), partition)); + state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx)); + writer.open(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + state.joiner.mergeWithLeftFrame(buffer, writer); + } + + @Override + public void fail() throws HyracksDataException { + failed = true; + writer.fail(); + } + + @Override + public void close() throws HyracksDataException { + if (failed) { + closeOnFail(); + return; + } + try { + try { + state.joiner.completeJoin(writer); + } finally { + state.joiner.releaseMemory(); + } + } catch (Exception e) { + writer.fail(); + throw e; + } finally { + writer.close(); + } + } + + private void closeOnFail() { + try { + writer.close(); + } catch (Exception e) { + //ignore + } + } + }; + } + + @Override + public String getDisplayName() { + return super.getDisplayName(); + } + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20034 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I29b62ad57b453f32c6390e3a1a98c407dff70680 Gerrit-Change-Number: 20034 Gerrit-PatchSet: 1 Gerrit-Owner: Janhavi Tripurwar <janhavi.tripur...@couchbase.com> Gerrit-MessageType: newchange