http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java new file mode 100644 index 0000000..e9f9db0 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector; +import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; +import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor; + +/** + * The right input is broadcast and the left input can be partitioned in any way. + */ +public class NestedLoopJoinPOperator extends AbstractJoinPOperator { + + private final int memSize; + + public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) { + super(kind, partitioningType); + this.memSize = memSize; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.NESTED_LOOP; + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) { + if (partitioningType != JoinPartitioningType.BROADCAST) { + throw new NotImplementedException(partitioningType + " nested loop joins are not implemented."); + } + + IPartitioningProperty pp; + + AbstractLogicalOperator op = (AbstractLogicalOperator) iop; + + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue(); + IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties(); + if (pv1 == null) { + pp = null; + } else { + pp = pv1.getPartitioningProperty(); + } + } else { + pp = IPartitioningProperty.UNPARTITIONED; + } + + // Nested loop join cannot maintain the local structure property for the probe side + // because of the I/O optimization for the build branch. + this.deliveredProperties = new StructuralPropertiesVector(pp, null); + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { + if (partitioningType != JoinPartitioningType.BROADCAST) { + throw new NotImplementedException(partitioningType + " nested loop joins are not implemented."); + } + + StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2]; + + // TODO: leverage statistics to make better decisions. + pv[0] = new StructuralPropertiesVector(null, null); + pv[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()), + null); + return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op; + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); + IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1]; + conditionInputSchemas[0] = propagatedSchema; + IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); + IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(), + context.getTypeEnvironment(op), conditionInputSchemas, context); + ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond, + context.getBinaryBooleanInspectorFactory()); + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + IOperatorDescriptor opDesc = null; + + switch (kind) { + case INNER: { + opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false, + null); + break; + } + case LEFT_OUTER: { + IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()]; + for (int j = 0; j < nonMatchWriterFactories.length; j++) { + nonMatchWriterFactories[j] = context.getMissingWriterFactory(); + } + opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true, + nonMatchWriterFactories); + break; + } + default: { + throw new NotImplementedException(); + } + } + contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); + + ILogicalOperator src1 = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src1, 0, op, 0); + ILogicalOperator src2 = op.getInputs().get(1).getValue(); + builder.contributeGraphEdge(src2, 0, op, 1); + } + + public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory { + + private static final long serialVersionUID = 1L; + private final IScalarEvaluatorFactory cond; + private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory; + + public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond, + IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) { + this.cond = cond; + this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory; + } + + @Override + public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) { + return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx)); + } + } + + public static class TuplePairEvaluator implements ITuplePairComparator { + private final IHyracksTaskContext ctx; + private IScalarEvaluator condEvaluator; + private final IScalarEvaluatorFactory condFactory; + private final IPointable p; + private final CompositeFrameTupleReference compositeTupleRef; + private final FrameTupleReference leftRef; + private final FrameTupleReference rightRef; + private final IBinaryBooleanInspector binaryBooleanInspector; + + public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory, + IBinaryBooleanInspector binaryBooleanInspector) { + this.ctx = ctx; + this.condFactory = condFactory; + this.binaryBooleanInspector = binaryBooleanInspector; + this.leftRef = new FrameTupleReference(); + this.p = VoidPointable.FACTORY.createPointable(); + this.rightRef = new FrameTupleReference(); + this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef); + } + + @Override + public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor, + int innerIndex) throws HyracksDataException { + if (condEvaluator == null) { + try { + this.condEvaluator = condFactory.createScalarEvaluator(ctx); + } catch (AlgebricksException ae) { + throw new HyracksDataException(ae); + } + } + compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex); + try { + condEvaluator.evaluate(compositeTupleRef, p); + } catch (AlgebricksException ae) { + throw new HyracksDataException(ae); + } + boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(), + p.getLength()); + if (result) { + return 0; + } else { + return 1; + } + } + } + + public static class CompositeFrameTupleReference implements IFrameTupleReference { + + private final FrameTupleReference refLeft; + private final FrameTupleReference refRight; + + public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) { + this.refLeft = refLeft; + this.refRight = refRight; + } + + public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor, + int innerIndex) { + refLeft.reset(outerAccessor, outerIndex); + refRight.reset(innerAccessor, innerIndex); + } + + @Override + public int getFieldCount() { + return refLeft.getFieldCount() + refRight.getFieldCount(); + } + + @Override + public byte[] getFieldData(int fIdx) { + int leftFieldCount = refLeft.getFieldCount(); + if (fIdx < leftFieldCount) { + return refLeft.getFieldData(fIdx); + } else { + return refRight.getFieldData(fIdx - leftFieldCount); + } + } + + @Override + public int getFieldStart(int fIdx) { + int leftFieldCount = refLeft.getFieldCount(); + if (fIdx < leftFieldCount) { + return refLeft.getFieldStart(fIdx); + } else { + return refRight.getFieldStart(fIdx - leftFieldCount); + } + } + + @Override + public int getFieldLength(int fIdx) { + int leftFieldCount = refLeft.getFieldCount(); + if (fIdx < leftFieldCount) { + return refLeft.getFieldLength(fIdx); + } else { + return refRight.getFieldLength(fIdx - leftFieldCount); + } + } + + @Override + public IFrameTupleAccessor getFrameTupleAccessor() { + throw new NotImplementedException(); + } + + @Override + public int getTupleIndex() { + throw new NotImplementedException(); + } + + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java index d6bd554..8299f61 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java @@ -79,6 +79,14 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator { return partitioningFields; } + public RangePartitioningType getRangeType() { + return rangeType; + } + + public IRangeMap getRangeMap() { + return rangeMap; + } + public INodeDomain getDomain() { return domain; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java index ef37d8c..9667427 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java @@ -82,6 +82,14 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera return partitioningFields; } + public RangePartitioningType getRangeType() { + return rangeType; + } + + public IRangeMap getRangeMap() { + return rangeMap; + } + public INodeDomain getDomain() { return domain; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java index 14a8f16..db778f7 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java @@ -89,7 +89,7 @@ public class ReplicatePOperator extends AbstractPhysicalOperator { outputDependencyLabels[i] = 1; } } - return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels); + return new Pair<>(inputDependencyLabels, outputDependencyLabels); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java index e6517d0..823294e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java @@ -21,15 +21,12 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical; import java.util.ArrayList; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Triple; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; -import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java index fee9174..9d2a5da 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java @@ -36,9 +36,9 @@ public class PropagatingTypeEnvironment extends AbstractTypeEnvironment { private final ITypeEnvPointer[] envPointers; - private final List<LogicalVariable> nonNullVariables = new ArrayList<LogicalVariable>(); + private final List<LogicalVariable> nonNullVariables = new ArrayList<>(); - private final List<List<LogicalVariable>> correlatedNullableVariableLists = new ArrayList<List<LogicalVariable>>(); + private final List<List<LogicalVariable>> correlatedNullableVariableLists = new ArrayList<>(); public PropagatingTypeEnvironment(IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer, IMetadataProvider<?, ?> metadataProvider, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java index 02e4c8a..45f1b76 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java @@ -44,7 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.Log import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType; import org.apache.hyracks.algebricks.core.algebra.operators.physical.HybridHashJoinPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashJoinPOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.physical.NLJoinPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator; import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector; import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; @@ -83,8 +83,8 @@ public class JoinUtils { } private static void setNLJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) { - op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, context - .getPhysicalOptimizationConfig().getMaxRecordsPerFrame())); + op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, context + .getPhysicalOptimizationConfig().getMaxFramesForJoin())); } private static void setHashJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java index 9ce59ae..5290af4 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java @@ -140,9 +140,9 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor { @Override public void flush() throws HyracksDataException { - for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { - writers[i].flush(); - } +// for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { +// writers[i].flush(); +// } } @Override