Preston Carman has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/424
Change subject: Range connector update and new merge interval join. ...................................................................... Range connector update and new merge interval join. Change-Id: Ie6bca14b81c41859f0f89cdc1fc95efd393e79f3 --- A asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/interval/SortMergeIntervalJoinPOperator.java M asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java M asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java A asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java M asterix-app/data/csv/sample_01.csv A asterix-app/src/test/resources/runtimets/queries/temporal/TemporalQueries.xml A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.1.ddl.aql A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.2.update.aql A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.3.query.aql A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.4.query.aql A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.5.query.aql A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.6.query.aql A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.7.query.aql A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.3.adm A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.4.adm A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.5.adm A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.6.adm A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.7.adm M asterix-app/src/test/resources/runtimets/testsuite.xml M asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java M asterix-aql/src/main/javacc/AQL.jj A asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java M asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java M asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/AllenRelationsBinaryComparatorFactoryProvider.java A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/OverlapIntervalBinaryComparatorFactory.java A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalProjectBinaryComparatorFactory.java A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalReplicateBinaryComparatorFactory.java A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalSplitBinaryComparatorFactory.java M asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java A asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AIntervalPointable.java A asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinLocks.java A asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinOperatorDescriptor.java A asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoiner.java A asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalStatus.java M pom.xml 36 files changed, 1,958 insertions(+), 171 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/24/424/1 diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/interval/SortMergeIntervalJoinPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/interval/SortMergeIntervalJoinPOperator.java new file mode 100644 index 0000000..ae6d71c --- /dev/null +++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/interval/SortMergeIntervalJoinPOperator.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.algebra.operators.physical.interval; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.runtime.operators.interval.SortMergeIntervalJoinOperatorDescriptor; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; +import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; + +public class SortMergeIntervalJoinPOperator extends AbstractJoinPOperator { + + private final int memSize; + protected final List<LogicalVariable> keysLeftBranch; + protected final List<LogicalVariable> keysRightBranch; + private final IBinaryComparatorFactoryProvider bcfp; + private IRangeMap rangeMap; + + public SortMergeIntervalJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize, + List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IBinaryComparatorFactoryProvider bcfp, + IRangeMap rangeMap) { + super(kind, partitioningType); + this.memSize = memSize; + this.keysLeftBranch = sideLeft; + this.keysRightBranch = sideRight; + this.bcfp = bcfp; + this.rangeMap = rangeMap; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.SORT_MERGE_INTERVAL_JOIN; + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) { + IPartitioningProperty pp = null; + ArrayList<OrderColumn> order = new ArrayList<OrderColumn>(); + for (LogicalVariable v : keysLeftBranch) { + order.add(new OrderColumn(v, OrderKind.ASC)); + } + pp = new OrderedPartitionedProperty(order, null, rangeMap, RangePartitioningType.PROJECT); + List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>(); + propsLocal.add(new LocalOrderProperty(order)); + deliveredProperties = new StructuralPropertiesVector(pp, propsLocal); + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop, + IPhysicalPropertiesVector reqdByParent) { + StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2]; + AbstractLogicalOperator op = (AbstractLogicalOperator) iop; + + IPartitioningProperty ppLeft = null; + List<ILocalStructuralProperty> ispLeft = new ArrayList<ILocalStructuralProperty>(); + IPartitioningProperty ppRight = null; + List<ILocalStructuralProperty> ispRight = new ArrayList<ILocalStructuralProperty>(); + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { + ArrayList<OrderColumn> orderLeft = new ArrayList<OrderColumn>(); + for (LogicalVariable v : keysLeftBranch) { + orderLeft.add(new OrderColumn(v, OrderKind.ASC)); + } + ppLeft = new OrderedPartitionedProperty(orderLeft, null, rangeMap, RangePartitioningType.PROJECT); + ispLeft.add(new LocalOrderProperty(orderLeft)); + + ArrayList<OrderColumn> orderRight = new ArrayList<OrderColumn>(); + for (LogicalVariable v : keysRightBranch) { + orderRight.add(new OrderColumn(v, OrderKind.ASC)); + } + ppRight = new OrderedPartitionedProperty(orderRight, null, rangeMap, RangePartitioningType.SPLIT); + ispRight.add(new LocalOrderProperty(orderRight)); + } + + pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft); + pv[1] = new StructuralPropertiesVector(ppRight, ispRight); + IPartitioningRequirementsCoordinator prc = IPartitioningRequirementsCoordinator.NO_COORDINATION; + return new PhysicalRequirements(pv, prc); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]); + int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]); + IVariableTypeEnvironment env = context.getTypeEnvironment(op); + IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length]; + int i = 0; + for (LogicalVariable v : keysLeftBranch) { + Object t = env.getVarType(v); + comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true); + } + + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, + context); + + SortMergeIntervalJoinOperatorDescriptor opDesc = new SortMergeIntervalJoinOperatorDescriptor(spec, memSize, + recordDescriptor, keysLeft, keysRight, comparatorFactories); + contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); + + ILogicalOperator src1 = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src1, 0, op, 0); + ILogicalOperator src2 = op.getInputs().get(1).getValue(); + builder.contributeGraphEdge(src2, 0, op, 1); + } +} diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java index a1c9d8c..940a814 100644 --- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java +++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java @@ -289,8 +289,8 @@ //Turned off the following rule for now not to change OptimizerTest results. //physicalRewritesAllLevels.add(new IntroduceTransactionCommitByAssignOpRule()); physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule()); - physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule()); physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule()); + physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule()); physicalRewritesAllLevels.add(new IntroduceInstantLockSearchCallbackRule()); physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule()); physicalRewritesAllLevels.add(new EnforceStructuralPropertiesRule()); diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index 353c9be..9feaa09 100644 --- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -33,6 +33,7 @@ import org.apache.asterix.om.functions.AsterixBuiltinFunctions; import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams; import org.apache.asterix.optimizer.rules.am.BTreeJobGenParams; +import org.apache.asterix.optimizer.rules.util.JoinUtils; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -61,7 +62,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -import org.apache.hyracks.algebricks.rewriter.util.JoinUtils; public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule { diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java new file mode 100644 index 0000000..d17e523 --- /dev/null +++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.optimizer.rules.util; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.asterix.algebra.operators.physical.interval.SortMergeIntervalJoinPOperator; +import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation; +import org.apache.asterix.dataflow.data.nontagged.comparators.allenrelations.AllenRelationsBinaryComparatorFactoryProvider; +import org.apache.asterix.om.functions.AsterixBuiltinFunctions; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; + +public class JoinUtils { + + private static final Logger LOGGER = Logger.getLogger(JoinUtils.class.getName()); + + public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context) + throws AlgebricksException { + List<LogicalVariable> sideLeft = new LinkedList<LogicalVariable>(); + List<LogicalVariable> sideRight = new LinkedList<LogicalVariable>(); + List<LogicalVariable> varsLeft = op.getInputs().get(0).getValue().getSchema(); + List<LogicalVariable> varsRight = op.getInputs().get(1).getValue().getSchema(); + ILogicalExpression conditionLE = op.getCondition().getValue(); + if (conditionLE.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { + return; + } + AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) conditionLE; + if (isIntervalJoinCondition(fexp, varsLeft, varsRight, sideLeft, sideRight)) { + IntervalJoinExpressionAnnotation ijea = getIntervalJoinAnnotation(fexp); + if (ijea == null) { + // Use default join method. + return; + } + if (ijea.isMergeJoin()) { + // Sort Merge. + LOGGER.fine("Interval Join - Merge"); + setSortMergeIntervalJoinOp(op, sideLeft, sideRight, ijea.getRangeMap(), context); + } else if (ijea.isIopJoin()) { + // Overlapping Interval Partition. + LOGGER.fine("Interval Join - IOP"); + } else if (ijea.isSpatialJoin()) { + // Spatial Partition. + LOGGER.fine("Interval Join - Spatial Partitioning"); + } + } + } + + private static IntervalJoinExpressionAnnotation getIntervalJoinAnnotation(AbstractFunctionCallExpression fexp) { + Iterator<IExpressionAnnotation> annotationIter = fexp.getAnnotations().values().iterator(); + while (annotationIter.hasNext()) { + IExpressionAnnotation annotation = annotationIter.next(); + if (annotation instanceof IntervalJoinExpressionAnnotation) { + return (IntervalJoinExpressionAnnotation) annotation; + } + } + return null; + } + + private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator op, List<LogicalVariable> sideLeft, + List<LogicalVariable> sideRight, IRangeMap rangeMap, IOptimizationContext context) { + IBinaryComparatorFactoryProvider bcfp = (IBinaryComparatorFactoryProvider) AllenRelationsBinaryComparatorFactoryProvider.INSTANCE; + op.setPhysicalOperator(new SortMergeIntervalJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, + context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(), sideLeft, sideRight, bcfp, rangeMap)); + } + + private static boolean isIntervalJoinCondition(ILogicalExpression e, Collection<LogicalVariable> inLeftAll, + Collection<LogicalVariable> inRightAll, Collection<LogicalVariable> outLeftFields, + Collection<LogicalVariable> outRightFields) { + switch (e.getExpressionTag()) { + case FUNCTION_CALL: { + AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) e; + FunctionIdentifier fi = fexp.getFunctionIdentifier(); + if (!fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPS)) { + return false; + } + ILogicalExpression opLeft = fexp.getArguments().get(0).getValue(); + ILogicalExpression opRight = fexp.getArguments().get(1).getValue(); + if (opLeft.getExpressionTag() != LogicalExpressionTag.VARIABLE + || opRight.getExpressionTag() != LogicalExpressionTag.VARIABLE) { + return false; + } + LogicalVariable var1 = ((VariableReferenceExpression) opLeft).getVariableReference(); + if (inLeftAll.contains(var1) && !outLeftFields.contains(var1)) { + outLeftFields.add(var1); + } else if (inRightAll.contains(var1) && !outRightFields.contains(var1)) { + outRightFields.add(var1); + } else { + return false; + } + LogicalVariable var2 = ((VariableReferenceExpression) opRight).getVariableReference(); + if (inLeftAll.contains(var2) && !outLeftFields.contains(var2)) { + outLeftFields.add(var2); + } else if (inRightAll.contains(var2) && !outRightFields.contains(var2)) { + outRightFields.add(var2); + } else { + return false; + } + return true; + } + default: { + return false; + } + } + } +} diff --git a/asterix-app/data/csv/sample_01.csv b/asterix-app/data/csv/sample_01.csv index 4dd437a..fbba382 100644 --- a/asterix-app/data/csv/sample_01.csv +++ b/asterix-app/data/csv/sample_01.csv @@ -1,8 +1,8 @@ -1,0.899682764,5.6256,2013-08-07,07:22:35,1979-02-25T23:48:27.034 -2,0.669052398,,-1923-03-29,19:33:34,-1979-02-25T23:48:27.002 -3,0.572733058,192674,-1923-03-28,19:33:34,-1979-02-25T23:48:27.001 -4,,192674,-1923-03-27,19:33:34,-1979-02-25T23:48:27.001 -5,0.572733058,192674,,19:33:34,-1979-02-25T23:48:27.001 -6,0.572733058,192674,-1923-03-25,,-1979-02-25T23:48:27.001 -7,0.572733058,192674,-1923-03-24,19:33:34, +1,0.899682764,5.6256,2013-08-07,07:22:35,1979-02-25T23:48:27.034 +2,0.669052398,,-1923-03-29,19:33:34,-1979-02-25T23:48:27.002 +3,0.572733058,192674,-1923-03-28,19:33:34,-1979-02-25T23:48:27.001 +4,,192674,-1923-03-27,19:33:34,-1979-02-25T23:48:27.001 +5,0.572733058,192674,,19:33:34,-1979-02-25T23:48:27.001 +6,0.572733058,192674,-1923-03-25,,-1979-02-25T23:48:27.001 +7,0.572733058,192674,-1923-03-24,19:33:34, 8,,,,, \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/TemporalQueries.xml b/asterix-app/src/test/resources/runtimets/queries/temporal/TemporalQueries.xml new file mode 100644 index 0000000..04a5329 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/temporal/TemporalQueries.xml @@ -0,0 +1,156 @@ +<!-- + ! Copyright 2009-2013 by The Regents of the University of California + ! Licensed under the Apache License, Version 2.0 (the "License"); + ! you may not use this file except in compliance with the License. + ! you may obtain a copy of the License from + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, software + ! distributed under the License is distributed on an "AS IS" BASIS, + ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ! See the License for the specific language governing permissions and + ! limitations under the License. + !--> + <test-case FilePath="temporal"> + <compilation-unit name="overlap_bins_gby_3"> + <output-dir compare="Text">overlap_bins_gby_3</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="agg_01"> + <output-dir compare="Text">agg_01</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="overlap_bins_gby_1"> + <output-dir compare="Text">overlap_bins_gby_1</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="duration_functions"> + <output-dir compare="Text">duration_functions</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="overlap_bins_gby_0"> + <output-dir compare="Text">overlap_bins_gby_0</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="get_overlapping_interval"> + <output-dir compare="Text">get_overlapping_interval</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="overlap_bins"> + <output-dir compare="Text">overlap_bins</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="parse_02"> + <output-dir compare="Text">parse_02</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="parse_01"> + <output-dir compare="Text">parse_01</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="day_of_week_01"> + <output-dir compare="Text">day_of_week_01</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="interval_bin"> + <output-dir compare="Text">interval_bin</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="interval_bin_gby_0"> + <output-dir compare="Text">interval_bin_gby_0</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="interval_bin_gby_1"> + <output-dir compare="Text">interval_bin_gby_1</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="accessors"> + <output-dir compare="Text">accessors</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="accessors_interval"> + <output-dir compare="Text">accessors_interval</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="accessors_interval_null"> + <output-dir compare="Text">accessors_interval_null</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="adjust_timezone"> + <output-dir compare="Text">adjust_timezone</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="calendar_duration"> + <output-dir compare="Text">calendar_duration</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="date_functions"> + <output-dir compare="Text">date_functions</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="datetime_functions"> + <output-dir compare="Text">datetime_functions</output-dir> + </compilation-unit> + </test-case> + <!-- + <test-case FilePath="temporal"> + <compilation-unit name="insert_from_delimited_ds"> + <output-dir compare="Text">insert_from_delimited_ds</output-dir> + </compilation-unit> + </test-case> + --> + <test-case FilePath="temporal"> + <compilation-unit name="insert_from_ext_ds"> + <output-dir compare="Text">insert_from_ext_ds</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="insert_from_ext_ds_2"> + <output-dir compare="Text">insert_from_ext_ds_2</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="interval_functions"> + <output-dir compare="Text">interval_functions</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal/interval_joins"> + <compilation-unit name="interval_overlaps"> + <output-dir compare="Text">interval_overlaps</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="time_functions"> + <output-dir compare="Text">time_functions</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="constructor"> + <compilation-unit name="interval"> + <output-dir compare="Text">interval</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="temporal"> + <compilation-unit name="duration_comps"> + <output-dir compare="Text">duration_comps</output-dir> + </compilation-unit> + </test-case> diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.1.ddl.aql new file mode 100644 index 0000000..f494ebe --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.1.ddl.aql @@ -0,0 +1,26 @@ +/* + * Description : Check temporal join functionality for interval + * Expected Result : Success + * Date : 26th Jun, 2015 + */ + +drop dataverse TinyCollege if exists; +create dataverse TinyCollege; +use dataverse TinyCollege; + +create type StaffType as open { + name: string, + office: string, + employment: interval +} +create dataset Staff(StaffType) +primary key name; + + +create type StudentType as open { + name: string, + office: string, + attendance: interval +} +create dataset Students(StudentType) +primary key name; diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.2.update.aql new file mode 100644 index 0000000..0b7b8fa --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.2.update.aql @@ -0,0 +1,22 @@ +/* + * Description : Check temporal join functionality for interval + * Expected Result : Success + * Date : 26th Jun, 2015 + */ + +use dataverse TinyCollege; + +insert into dataset Staff ({"name":"Alex", "office":"A", "employment":interval-from-date(date("2003-01-01"), date("2008-01-01"))}); +insert into dataset Staff ({"name":"Elisabeth", "office":"B", "employment":interval-from-date(date("2002-01-01"), date("2010-01-01"))}); +insert into dataset Staff ({"name":"Frank", "office":"A", "employment":interval-from-date(date("2004-01-01"), date("2009-01-01"))}); +insert into dataset Staff ({"name":"Henry", "office":"C", "employment":interval-from-date(date("2003-01-01"), date("2008-01-01"))}); +insert into dataset Staff ({"name":"Mary", "office":"B", "employment":interval-from-date(date("2006-01-01"), date("2010-01-01"))}); +insert into dataset Staff ({"name":"Vicky", "office":"D", "employment":interval-from-date(date("2001-01-01"), date("2010-01-01"))}); + +insert into dataset Students ({"name":"Charles", "office":"X", "attendance":interval-from-date(date("2001-01-01"), date("2004-01-01"))}); +insert into dataset Students ({"name":"Frank", "office":"Y", "attendance":interval-from-date(date("2001-01-01"), date("2004-01-01"))}); +insert into dataset Students ({"name":"Karen", "office":"Y", "attendance":interval-from-date(date("2007-01-01"), date("2009-01-01"))}); +insert into dataset Students ({"name":"Mary", "office":"Y", "attendance":interval-from-date(date("2002-01-01"), date("2005-01-01"))}); +insert into dataset Students ({"name":"Olga", "office":"Z", "attendance":interval-from-date(date("2001-01-01"), date("2003-01-01"))}); +insert into dataset Students ({"name":"Steve", "office":"Z", "attendance":interval-from-date(date("2007-01-01"), date("2010-01-01"))}); + diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.3.query.aql new file mode 100644 index 0000000..c8b2d5e --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.3.query.aql @@ -0,0 +1,14 @@ +/* + * Description : Check temporal join functionality for interval + * Expected Result : Success + * Date : 26th Jun, 2015 + */ + +use dataverse TinyCollege; + +for $f in dataset Staff +for $d in dataset Students +where interval-overlaps($f.employment, $d.attendance) +/*+ range ["F", "L", "R"] */ +order by $f.name, $d.name +return { "staff" : $f.name, "student" : $d.name } diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.4.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.4.query.aql new file mode 100644 index 0000000..22e080b --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.4.query.aql @@ -0,0 +1,14 @@ +/* + * Description : Check temporal join functionality for interval + * Expected Result : Success + * Date : 26th Jun, 2015 + */ + +use dataverse TinyCollege; + +for $f in dataset Staff +for $d in dataset Students +where /*+ interval-merge-join [991353600000, 1054425600000, 1149120000000] */ interval-overlaps($f.employment, $d.attendance) +/*+ range ["F", "L", "R"] */ +order by $f.name, $d.name +return { "staff" : $f.name, "student" : $d.name } diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.5.query.aql new file mode 100644 index 0000000..eb5c5c1 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.5.query.aql @@ -0,0 +1,16 @@ +/* + * Description : Check temporal join functionality for interval + * Expected Result : Success + * Date : 26th Jun, 2015 + */ + +use dataverse TinyCollege; + +/* +for $f in dataset Staff +for $d in dataset Students +where /*+ interval-iop-join [991353600000, 1054425600000, 1149120000000] */ interval-overlaps($f.employment, $d.attendance) +/*+ range ["F", "L", "R"] */ +order by $f.name, $d.name +return { "staff" : $f.name, "student" : $d.name } +*/ \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.6.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.6.query.aql new file mode 100644 index 0000000..86a2587 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.6.query.aql @@ -0,0 +1,16 @@ +/* + * Description : Check temporal join functionality for interval + * Expected Result : Success + * Date : 26th Jun, 2015 + */ + +use dataverse TinyCollege; + +/* +for $f in dataset Staff +for $d in dataset Students +where /*+ interval-spatial-join [991353600000, 1054425600000, 1149120000000] */ interval-overlaps($f.employment, $d.attendance) +/*+ range ["F", "L", "R"] */ +order by $f.name, $d.name +return { "staff" : $f.name, "student" : $d.name } +*/ \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.7.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.7.query.aql new file mode 100644 index 0000000..f8da7f6 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.7.query.aql @@ -0,0 +1,14 @@ +/* + * Description : Check temporal join functionality for interval + * Expected Result : Success + * Date : 26th Jun, 2015 + */ + +use dataverse TinyCollege; + +for $f in dataset Staff +for $d in dataset Students +where interval-overlaps($d.attendance, $f.employment) +/*+ range ["F", "L", "R"] */ +order by $f.name, $d.name +return { "staff" : $f.name, "student" : $d.name } diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.3.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.3.adm new file mode 100644 index 0000000..a9bd2e4 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.3.adm @@ -0,0 +1,6 @@ +[ { "staff": "Alex", "student": "Karen" } +, { "staff": "Alex", "student": "Steve" } +, { "staff": "Frank", "student": "Steve" } +, { "staff": "Henry", "student": "Karen" } +, { "staff": "Henry", "student": "Steve" } + ] \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.4.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.4.adm new file mode 100644 index 0000000..a9bd2e4 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.4.adm @@ -0,0 +1,6 @@ +[ { "staff": "Alex", "student": "Karen" } +, { "staff": "Alex", "student": "Steve" } +, { "staff": "Frank", "student": "Steve" } +, { "staff": "Henry", "student": "Karen" } +, { "staff": "Henry", "student": "Steve" } + ] \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.5.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.5.adm new file mode 100644 index 0000000..a9bd2e4 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.5.adm @@ -0,0 +1,6 @@ +[ { "staff": "Alex", "student": "Karen" } +, { "staff": "Alex", "student": "Steve" } +, { "staff": "Frank", "student": "Steve" } +, { "staff": "Henry", "student": "Karen" } +, { "staff": "Henry", "student": "Steve" } + ] \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.6.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.6.adm new file mode 100644 index 0000000..a9bd2e4 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.6.adm @@ -0,0 +1,6 @@ +[ { "staff": "Alex", "student": "Karen" } +, { "staff": "Alex", "student": "Steve" } +, { "staff": "Frank", "student": "Steve" } +, { "staff": "Henry", "student": "Karen" } +, { "staff": "Henry", "student": "Steve" } + ] \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.7.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.7.adm new file mode 100644 index 0000000..65a07bb --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.7.adm @@ -0,0 +1,11 @@ +[ { "staff": "Alex", "student": "Charles" } +, { "staff": "Alex", "student": "Frank" } +, { "staff": "Alex", "student": "Mary" } +, { "staff": "Elisabeth", "student": "Charles" } +, { "staff": "Elisabeth", "student": "Frank" } +, { "staff": "Elisabeth", "student": "Olga" } +, { "staff": "Frank", "student": "Mary" } +, { "staff": "Henry", "student": "Charles" } +, { "staff": "Henry", "student": "Frank" } +, { "staff": "Henry", "student": "Mary" } + ] \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml index d706feb..6e92521 100644 --- a/asterix-app/src/test/resources/runtimets/testsuite.xml +++ b/asterix-app/src/test/resources/runtimets/testsuite.xml @@ -19,6 +19,7 @@ <!DOCTYPE test-suite [ <!ENTITY RecordsQueries SYSTEM "queries/records/RecordsQueries.xml"> +<!ENTITY TemporalQueries SYSTEM "queries/temporal/TemporalQueries.xml"> ]> <test-suite @@ -6069,7 +6070,7 @@ <output-dir compare="Text">feeds_04</output-dir> </compilation-unit> </test-case> - + <test-case FilePath="feeds"> <compilation-unit name="feeds_06"> <output-dir compare="Text">feeds_06</output-dir> @@ -6108,7 +6109,7 @@ <output-dir compare="Text">feeds_12</output-dir> </compilation-unit> </test-case> - + <test-case FilePath="feeds"> <compilation-unit name="issue_230_feeds"> <output-dir compare="Text">issue_230_feeds</output-dir> @@ -6124,7 +6125,7 @@ </test-group> <test-group name="hdfs"> - <test-case FilePath="hdfs"> + <test-case FilePath="hdfs"> <compilation-unit name="hdfs_shortcircuit"> <output-dir compare="Text">hdfs_shortcircuit</output-dir> </compilation-unit> @@ -6200,143 +6201,7 @@ </test-case> </test-group> <test-group name="temporal"> - <test-case FilePath="temporal"> - <compilation-unit name="overlap_bins_gby_3"> - <output-dir compare="Text">overlap_bins_gby_3</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="agg_01"> - <output-dir compare="Text">agg_01</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="overlap_bins_gby_1"> - <output-dir compare="Text">overlap_bins_gby_1</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="duration_functions"> - <output-dir compare="Text">duration_functions</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="overlap_bins_gby_0"> - <output-dir compare="Text">overlap_bins_gby_0</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="get_overlapping_interval"> - <output-dir compare="Text">get_overlapping_interval</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="overlap_bins"> - <output-dir compare="Text">overlap_bins</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="parse_02"> - <output-dir compare="Text">parse_02</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="parse_01"> - <output-dir compare="Text">parse_01</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="day_of_week_01"> - <output-dir compare="Text">day_of_week_01</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="interval_bin"> - <output-dir compare="Text">interval_bin</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="interval_bin_gby_0"> - <output-dir compare="Text">interval_bin_gby_0</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="interval_bin_gby_1"> - <output-dir compare="Text">interval_bin_gby_1</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="accessors"> - <output-dir compare="Text">accessors</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="accessors_interval"> - <output-dir compare="Text">accessors_interval</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="accessors_interval_null"> - <output-dir compare="Text">accessors_interval_null</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="adjust_timezone"> - <output-dir compare="Text">adjust_timezone</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="calendar_duration"> - <output-dir compare="Text">calendar_duration</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="date_functions"> - <output-dir compare="Text">date_functions</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="datetime_functions"> - <output-dir compare="Text">datetime_functions</output-dir> - </compilation-unit> - </test-case> - <!-- - <test-case FilePath="temporal"> - <compilation-unit name="insert_from_delimited_ds"> - <output-dir compare="Text">insert_from_delimited_ds</output-dir> - </compilation-unit> - </test-case> - --> - <test-case FilePath="temporal"> - <compilation-unit name="insert_from_ext_ds"> - <output-dir compare="Text">insert_from_ext_ds</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="insert_from_ext_ds_2"> - <output-dir compare="Text">insert_from_ext_ds_2</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="interval_functions"> - <output-dir compare="Text">interval_functions</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="time_functions"> - <output-dir compare="Text">time_functions</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="constructor"> - <compilation-unit name="interval"> - <output-dir compare="Text">interval</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="temporal"> - <compilation-unit name="duration_comps"> - <output-dir compare="Text">duration_comps</output-dir> - </compilation-unit> - </test-case> + &TemporalQueries; </test-group> <test-group name="leftouterjoin"> <test-case FilePath="leftouterjoin"> diff --git a/asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java b/asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java index b18c317..cf20a48 100644 --- a/asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java +++ b/asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java @@ -63,7 +63,9 @@ AQLParser parser = new AQLParser((String) hint); List<Statement> hintStatements = parser.parse(); - if (hintStatements.size() != 1) { + if (hintStatements.size() == 0) { + throw new ParseException("No range hint was supplied to the RangeMapBuilder."); + } else if (hintStatements.size() != 1) { throw new ParseException("Only one range statement is allowed for the range hint."); } diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj index 0aab549..f1b1b85 100644 --- a/asterix-aql/src/main/javacc/AQL.jj +++ b/asterix-aql/src/main/javacc/AQL.jj @@ -116,6 +116,7 @@ import org.apache.asterix.common.annotations.FieldValFileSameIndexDataGen; import org.apache.asterix.common.annotations.IRecordFieldDataGen; import org.apache.asterix.common.annotations.InsertRandIntDataGen; +import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation; import org.apache.asterix.common.annotations.ListDataGen; import org.apache.asterix.common.annotations.ListValFileDataGen; import org.apache.asterix.common.annotations.SkipSecondaryIndexSearchExpressionAnnotation; @@ -146,7 +147,7 @@ private static final String DATETIME_ADD_RAND_HOURS_HINT = "datetime-add-rand-hours"; private static final String DATETIME_BETWEEN_YEARS_HINT = "datetime-between-years"; private static final String HASH_GROUP_BY_HINT = "hash"; - private static final String INDEXED_NESTED_LOOP_JOIN_HINT = "indexnl"; + private static final String INDEXED_NESTED_LOOP_JOIN_HINT = IndexedNLJoinExpressionAnnotation.HINT_STRING; private static final String INMEMORY_HINT = "inmem"; private static final String INSERT_RAND_INT_HINT = "insert-rand-int"; private static final String INTERVAL_HINT = "interval"; @@ -442,7 +443,7 @@ | ("internal" | "temporary" { temp = token.image.toLowerCase().equals("temporary"); } - )? + )? <DATASET> nameComponents = QualifiedName() <LEFTPAREN> typeName = Identifier() <RIGHTPAREN> ifNotExists = IfNotExists() @@ -656,7 +657,7 @@ FunctionSignature appliedFunction = null; CreateFeedStatement cfs = null; Pair<Identifier,Identifier> sourceNameComponents = null; - + } { ( @@ -668,7 +669,7 @@ } | ("primary")? "feed" nameComponents = QualifiedName() ifNotExists = IfNotExists() - "using" adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())? + "using" adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())? { cfs = new CreatePrimaryFeedStatement(nameComponents, adapterName, properties, appliedFunction, ifNotExists); @@ -681,8 +682,8 @@ CreateFeedPolicyStatement FeedPolicySpecification() throws ParseException: { - String policyName = null; - String basePolicyName = null; + String policyName = null; + String basePolicyName = null; String sourcePolicyFile = null; String definition = null; boolean ifNotExists = false; @@ -692,18 +693,18 @@ { ( "ingestion" "policy" policyName = Identifier() ifNotExists = IfNotExists() - <FROM> - ("policy" basePolicyName = Identifier() properties = Configuration() ("definition" definition = StringLiteral())? + <FROM> + ("policy" basePolicyName = Identifier() properties = Configuration() ("definition" definition = StringLiteral())? { cfps = new CreateFeedPolicyStatement(policyName, basePolicyName, properties, definition, ifNotExists); } - | "path" sourcePolicyFile = Identifier() ("definition" definition = StringLiteral())? + | "path" sourcePolicyFile = Identifier() ("definition" definition = StringLiteral())? { cfps = new CreateFeedPolicyStatement(policyName, sourcePolicyFile, definition, ifNotExists); } - ) - + ) + ) { return cfps; @@ -2038,7 +2039,13 @@ } callExpr = new CallExpr(signature,argList); if (hint != null) { - if (hint.startsWith(INDEXED_NESTED_LOOP_JOIN_HINT)) { + if (IntervalJoinExpressionAnnotation.isIntervalJoinHint(hint)) { + IntervalJoinExpressionAnnotation ijea = IntervalJoinExpressionAnnotation.INSTANCE; + ijea.setObject(hint); + ijea.setJoinType(hint); + ijea.setRangeMap(RangeMapBuilder.parseHint(hint.substring(IntervalJoinExpressionAnnotation.getHintLength(hint)))); + callExpr.addHint(ijea); + } else if (hint.startsWith(INDEXED_NESTED_LOOP_JOIN_HINT)) { callExpr.addHint(IndexedNLJoinExpressionAnnotation.INSTANCE); } else if (hint.startsWith(SKIP_SECONDARY_INDEX_SEARCH_HINT)) { callExpr.addHint(SkipSecondaryIndexSearchExpressionAnnotation.INSTANCE); diff --git a/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java b/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java new file mode 100644 index 0000000..de1ee1b --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.annotations; + +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation; +import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; + +public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation { + + public static final String IOP_HINT_STRING = "interval-iop-join"; + public static final String MERGE_HINT_STRING = "interval-merge-join"; + public static final String SPATIAL_HINT_STRING = "interval-spatial-join"; + public static final IntervalJoinExpressionAnnotation INSTANCE = new IntervalJoinExpressionAnnotation(); + + private Object object; + private IRangeMap map; + private String joinType; + + @Override + public Object getObject() { + return object; + } + + @Override + public void setObject(Object object) { + this.object = object; + } + + @Override + public IExpressionAnnotation copy() { + IntervalJoinExpressionAnnotation clone = new IntervalJoinExpressionAnnotation(); + clone.setObject(object); + return clone; + } + + public void setRangeMap(IRangeMap map) { + this.map = map; + } + + public IRangeMap getRangeMap() { + return map; + } + + public void setJoinType(String hint) { + if (hint.startsWith(IOP_HINT_STRING)) { + joinType = IOP_HINT_STRING; + } else if (hint.startsWith(MERGE_HINT_STRING)) { + joinType = MERGE_HINT_STRING; + } else if (hint.startsWith(SPATIAL_HINT_STRING)) { + joinType = SPATIAL_HINT_STRING; + } + } + + public String getRangeType() { + return joinType; + } + + public boolean isIopJoin() { + if (joinType.equals(IOP_HINT_STRING)) { + return true; + } + return false; + } + + public boolean isMergeJoin() { + if (joinType.equals(MERGE_HINT_STRING)) { + return true; + } + return false; + } + + public boolean isSpatialJoin() { + if (joinType.equals(SPATIAL_HINT_STRING)) { + return true; + } + return false; + } + + public static boolean isIntervalJoinHint(String hint) { + if (hint.startsWith(IOP_HINT_STRING) || hint.startsWith(MERGE_HINT_STRING) + || hint.startsWith(SPATIAL_HINT_STRING)) { + return true; + } else { + return false; + } + } + + public static int getHintLength(String hint) { + if (hint.startsWith(IOP_HINT_STRING)) { + return IOP_HINT_STRING.length(); + } else if (hint.startsWith(MERGE_HINT_STRING)) { + return MERGE_HINT_STRING.length(); + } else if (hint.startsWith(SPATIAL_HINT_STRING)) { + return SPATIAL_HINT_STRING.length(); + } + return 0; + } + +} diff --git a/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java b/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java index d6da48c..519458e 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java @@ -22,7 +22,7 @@ public class SkipSecondaryIndexSearchExpressionAnnotation implements IExpressionAnnotation { - public static final String SKIP_SECONDARY_INDEX_SEARCH_ANNOTATION_KEY = "skip-index"; + public static final String HINT_STRING = "skip-index"; public static final SkipSecondaryIndexSearchExpressionAnnotation INSTANCE = new SkipSecondaryIndexSearchExpressionAnnotation(); private Object object; diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java index f7c4428..aa38010 100644 --- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java +++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java @@ -42,19 +42,20 @@ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - int c = Double.compare( + int c = Long.compare( AInt64SerializerDeserializer.getLong(b1, s1 + AIntervalSerializerDeserializer.getIntervalStartOffset()), AInt64SerializerDeserializer.getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalStartOffset())); if (c == 0) { - c = Double.compare( + c = Long.compare( AInt64SerializerDeserializer.getLong(b1, s1 + AIntervalSerializerDeserializer.getIntervalEndOffset()), AInt64SerializerDeserializer.getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalEndOffset())); if (c == 0) { - c = Byte.compare(b1[s1 + 16], b2[s2 + 16]); + c = Byte.compare(b1[s1 + AIntervalSerializerDeserializer.getIntervalTagOffset()], b2[s2 + + AIntervalSerializerDeserializer.getIntervalTagOffset()]); } } return c; diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/AllenRelationsBinaryComparatorFactoryProvider.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/AllenRelationsBinaryComparatorFactoryProvider.java new file mode 100644 index 0000000..564a793 --- /dev/null +++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/AllenRelationsBinaryComparatorFactoryProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.dataflow.data.nontagged.comparators.allenrelations; + +import java.io.Serializable; + +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +public class AllenRelationsBinaryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider, Serializable { + + private static final long serialVersionUID = 1L; + public static final AllenRelationsBinaryComparatorFactoryProvider INSTANCE = new AllenRelationsBinaryComparatorFactoryProvider(); + + private AllenRelationsBinaryComparatorFactoryProvider() { + } + + @Override + public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending) { + // During a comparison, since proper type promotion among several numeric types are required, + // we will use AObjectAscBinaryComparatorFactory, instead of using a specific comparator + return OverlapIntervalBinaryComparatorFactory.INSTANCE; + } + + public IBinaryComparatorFactory getBinaryComparatorFactory(FunctionIdentifier fid, boolean ascending) { + return OverlapIntervalBinaryComparatorFactory.INSTANCE; + } + +} diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/OverlapIntervalBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/OverlapIntervalBinaryComparatorFactory.java new file mode 100644 index 0000000..f0443b7 --- /dev/null +++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/OverlapIntervalBinaryComparatorFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.dataflow.data.nontagged.comparators.allenrelations; + +import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +public class OverlapIntervalBinaryComparatorFactory implements IBinaryComparatorFactory { + + private static final long serialVersionUID = 1L; + + public static final OverlapIntervalBinaryComparatorFactory INSTANCE = new OverlapIntervalBinaryComparatorFactory(); + + private OverlapIntervalBinaryComparatorFactory() { + + } + + @Override + public IBinaryComparator createBinaryComparator() { + return new IBinaryComparator() { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + long start0 = AIntervalSerializerDeserializer.getIntervalStart(b1, s1); + long end0 = AIntervalSerializerDeserializer.getIntervalEnd(b1, s1); + long start1 = AIntervalSerializerDeserializer.getIntervalStart(b2, s2); + long end1 = AIntervalSerializerDeserializer.getIntervalEnd(b2, s2); + + if (start0 < start1 && end0 > start1 && end1 > end0) { + // These intervals overlap + return 0; + } + if (end0 < start1) { + return 1; + } + return -1; + } + }; + } +} diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalProjectBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalProjectBinaryComparatorFactory.java new file mode 100644 index 0000000..7a358f2 --- /dev/null +++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalProjectBinaryComparatorFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval; + +import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +public class RangeIntervalProjectBinaryComparatorFactory implements IBinaryComparatorFactory { + + private static final long serialVersionUID = 1L; + + public static final RangeIntervalProjectBinaryComparatorFactory INSTANCE = new RangeIntervalProjectBinaryComparatorFactory(); + + private RangeIntervalProjectBinaryComparatorFactory() { + + } + + /* + * The comparator uses the range map split value and an interval. + * + * -1: split point is less than the interval start point. + * 0: split point is equal to the interval start point + * 1: split point is greater than the interval start point. + */ + @Override + public IBinaryComparator createBinaryComparator() { + return new IBinaryComparator() { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return Long.compare(AInt64SerializerDeserializer.getLong(b1, s1), AInt64SerializerDeserializer + .getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalStartOffset())); + } + }; + } +} diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalReplicateBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalReplicateBinaryComparatorFactory.java new file mode 100644 index 0000000..19fce56 --- /dev/null +++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalReplicateBinaryComparatorFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval; + +import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +public class RangeIntervalReplicateBinaryComparatorFactory implements IBinaryComparatorFactory { + + private static final long serialVersionUID = 1L; + + public static final RangeIntervalReplicateBinaryComparatorFactory INSTANCE = new RangeIntervalReplicateBinaryComparatorFactory(); + + private RangeIntervalReplicateBinaryComparatorFactory() { + + } + + /* + * The comparator uses the range map split value and an interval. + * + * -1: split point is less than the interval start point. + * 0: split point is equal to or greater than the interval start point + * 1: never happens + */ + @Override + public IBinaryComparator createBinaryComparator() { + return new IBinaryComparator() { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int c = Long.compare(AInt64SerializerDeserializer.getLong(b1, s1), AInt64SerializerDeserializer + .getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalStartOffset())); + if (c > 0) { + c = 0; + } + return c; + } + }; + } + +} diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalSplitBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalSplitBinaryComparatorFactory.java new file mode 100644 index 0000000..8fc2b54 --- /dev/null +++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalSplitBinaryComparatorFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval; + +import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +public class RangeIntervalSplitBinaryComparatorFactory implements IBinaryComparatorFactory { + + private static final long serialVersionUID = 1L; + + public static final RangeIntervalSplitBinaryComparatorFactory INSTANCE = new RangeIntervalSplitBinaryComparatorFactory(); + + private RangeIntervalSplitBinaryComparatorFactory() { + + } + + /* + * The comparator uses the range map split value and an interval. + * + * -1: split point is less than the interval start point. + * 0: split point is in the interval + * 1: split point is greater than the interval end point. + */ + @Override + public IBinaryComparator createBinaryComparator() { + return new IBinaryComparator() { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int c = Double.compare(AInt64SerializerDeserializer.getLong(b1, s1), AInt64SerializerDeserializer + .getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalStartOffset())); + if (c > 0) { + c = Double.compare(AInt64SerializerDeserializer.getLong(b1, s1), AInt64SerializerDeserializer + .getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalEndOffset())); + if (c < 0) { + c = 0; + } + } + return c; + } + }; + } + +} diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java index 5300b28..d2735b4 100644 --- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java +++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java @@ -85,13 +85,17 @@ return 8; } + public static int getIntervalTagOffset() { + return 16; + } + public static byte getIntervalTimeType(byte[] data, int offset) { return data[offset + 8 * 2]; } /** * create an interval value from two given datetime instance. - * + * * @param interval * @param out * @throws HyracksDataException diff --git a/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AIntervalPointable.java b/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AIntervalPointable.java new file mode 100644 index 0000000..73af3d2 --- /dev/null +++ b/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AIntervalPointable.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.om.pointables.nonvisitor; + +import org.apache.asterix.om.util.container.IObjectFactory; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.data.std.api.AbstractPointable; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IPointableFactory; +import org.apache.hyracks.data.std.primitive.BytePointable; +import org.apache.hyracks.data.std.primitive.LongPointable; + +/* + * This class interprets the binary data representation of an interval. + * + * The interval can be time, date, or datetime defined by the tag. + * + * Interval { + * int startPoint; + * int endPoint; + * byte tag; + * } + */ +public class AIntervalPointable extends AbstractPointable { + + public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() { + private static final long serialVersionUID = 1L; + + @Override + public boolean isFixedLength() { + return true; + } + + @Override + public int getFixedLength() { + return 17; + } + }; + + public static final IPointableFactory FACTORY = new IPointableFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IPointable createPointable() { + return new AIntervalPointable(); + } + + @Override + public ITypeTraits getTypeTraits() { + return TYPE_TRAITS; + } + }; + + public static final IObjectFactory<IPointable, String> ALLOCATOR = new IObjectFactory<IPointable, String>() { + public IPointable create(String id) { + return new AIntervalPointable(); + } + }; + + private static final int TAG_SIZE = 1; + private static final int START_LENGTH_SIZE = 8; + private static final int END_LENGTH_SIZE = 8; + + public long getStart() { + return LongPointable.getLong(bytes, getStartOffset()); + } + + public int getStartOffset() { + return start; + } + + public int getStartSize() { + return START_LENGTH_SIZE; + } + + public long getEnd() { + return LongPointable.getLong(bytes, getEndOffset()); + } + + public int getEndOffset() { + return getStartOffset() + getStartSize(); + } + + public int getEndSize() { + return END_LENGTH_SIZE; + } + + public byte getTag() { + return BytePointable.getByte(bytes, getTagOffset()); + } + + public int getTagOffset() { + return getEndOffset() + getEndSize(); + } + + public int getTagSize() { + return TAG_SIZE; + } + +} diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinLocks.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinLocks.java new file mode 100644 index 0000000..f23bd55 --- /dev/null +++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinLocks.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators.interval; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class SortMergeIntervalJoinLocks implements Serializable { + private static final long serialVersionUID = 1L; + + private final List<Lock> lock = new ArrayList<Lock>(); + private final List<Condition> left = new ArrayList<Condition>(); + private final List<Condition> right = new ArrayList<Condition>(); + + public synchronized void setPartitions(int partitions) { + for (int i = lock.size(); i < partitions; ++i) { + lock.add(new ReentrantLock()); + left.add(lock.get(i).newCondition()); + right.add(lock.get(i).newCondition()); + } + } + + public Lock getLock(int partition) { + return lock.get(partition); + } + + public Condition getLeft(int partition) { + return left.get(partition); + } + + public Condition getRight(int partition) { + return right.get(partition); + } +} diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinOperatorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinOperatorDescriptor.java new file mode 100644 index 0000000..de57f3f --- /dev/null +++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinOperatorDescriptor.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators.interval; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.TaskId; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator; +import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractStateObject; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable; + +public class SortMergeIntervalJoinOperatorDescriptor extends AbstractOperatorDescriptor { + private static final int LEFT_ACTIVITY_ID = 0; + private static final int RIGHT_ACTIVITY_ID = 1; + private final IBinaryComparatorFactory[] comparatorFactories; + private final int[] keys0; + private final int[] keys1; + private final int memSize; + + public SortMergeIntervalJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSize, + RecordDescriptor recordDescriptor, int[] keys0, int[] keys1, + IBinaryComparatorFactory[] comparatorFactories) { + super(spec, 2, 1); + recordDescriptors[0] = recordDescriptor; + this.comparatorFactories = comparatorFactories; + this.keys0 = keys0; + this.keys1 = keys1; + this.memSize = memSize; + } + + private static final long serialVersionUID = 1L; + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + SortMergeIntervalJoinLocks locks = new SortMergeIntervalJoinLocks(); + ActivityId p1Aid = new ActivityId(odId, LEFT_ACTIVITY_ID); + ActivityId p2Aid = new ActivityId(odId, RIGHT_ACTIVITY_ID); + LeftActivityNode phase1 = new LeftActivityNode(p1Aid, p2Aid, locks); + RightActivityNode phase2 = new RightActivityNode(p2Aid, p1Aid, locks); + + builder.addActivity(this, phase1); + builder.addSourceEdge(1, phase1, 0); + + builder.addActivity(this, phase2); + builder.addSourceEdge(0, phase2, 0); + + builder.addTargetEdge(0, phase2, 0); + } + + public static class SortMergeIntervalJoinTaskState extends AbstractStateObject { + private SortMergeIntervalStatus status; + private SortMergeIntervalJoiner joiner; + private boolean failed; + + private SortMergeIntervalJoinTaskState(JobId jobId, TaskId taskId) { + super(jobId, taskId); + status = new SortMergeIntervalStatus(); + } + + @Override + public void toBytes(DataOutput out) throws IOException { + + } + + @Override + public void fromBytes(DataInput in) throws IOException { + } + } + + private class LeftActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + private final ActivityId joinAid; + private final SortMergeIntervalJoinLocks locks; + + public LeftActivityNode(ActivityId id, ActivityId joinAid, SortMergeIntervalJoinLocks locks) { + super(id); + this.joinAid = joinAid; + this.locks = locks; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + locks.setPartitions(nPartitions); + final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; + for (int i = 0; i < comparatorFactories.length; ++i) { + comparators[i] = comparatorFactories[i].createBinaryComparator(); + } + return new LeftOperator(ctx, partition, inRecordDesc, comparators); + } + + private class LeftOperator extends AbstractUnaryOutputOperatorNodePushable { + + private final IHyracksTaskContext ctx; + private final int partition; + private final IBinaryComparator[] comparators; + private final RecordDescriptor leftRD; + + public LeftOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc, + IBinaryComparator[] comparators) { + this.ctx = ctx; + this.partition = partition; + this.leftRD = inRecordDesc; + this.comparators = comparators; + } + + @Override + public int getInputArity() { + return inputArity; + } + + @Override + public IFrameWriter getInputFrameWriter(int index) { + return new IFrameWriter() { + private SortMergeIntervalJoinTaskState state; + private boolean first = true; + + @Override + public void open() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + writer.open(); + state = new SortMergeIntervalJoinTaskState(ctx.getJobletContext().getJobId(), + new TaskId(getActivityId(), partition)); + state.status.openLeft(); + state.joiner = new SortMergeIntervalJoiner(ctx, memSize, partition, state.status, locks, + new FrameTuplePairComparator(keys0, keys1, comparators), writer, leftRD); + locks.getRight(partition).signal(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + locks.getLock(partition).lock(); + if (first) { + state.status.dataLeft(); + first = false; + } + try { + state.joiner.setLeftFrame(buffer); + state.joiner.processMerge(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void fail() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.failed = true; + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void close() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.status.leftHasMore = false; + if (state.failed) { + writer.fail(); + } else { + state.joiner.processMerge(); + writer.close(); + } + state.status.closeLeft(); + } finally { + locks.getLock(partition).unlock(); + } + } + }; + } + } + } + + private class RightActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + private final ActivityId joinAid; + private SortMergeIntervalJoinLocks locks; + + public RightActivityNode(ActivityId id, ActivityId joinAid, SortMergeIntervalJoinLocks locks) { + super(id); + this.joinAid = joinAid; + this.locks = locks; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + locks.setPartitions(nPartitions); + RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + return new RightOperator(ctx, partition, inRecordDesc); + } + + private class RightOperator extends AbstractUnaryOutputOperatorNodePushable { + + private int partition; + private IHyracksTaskContext ctx; + private final RecordDescriptor rightRD; + + public RightOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) { + this.ctx = ctx; + this.partition = partition; + this.rightRD = inRecordDesc; + } + + @Override + public int getInputArity() { + return inputArity; + } + + @Override + public IFrameWriter getInputFrameWriter(int index) { + return new IFrameWriter() { + private SortMergeIntervalJoinTaskState state; + private boolean first = true; + + @Override + public void open() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + do { + // Wait for the state to be set in the context form Left. + state = (SortMergeIntervalJoinTaskState) ctx + .getStateObject(new TaskId(joinAid, partition)); + if (state == null) { + locks.getRight(partition).await(); + } + } while (state == null); + state.joiner.setRightRecordDescriptor(rightRD); + state.status.openRight(); + } catch (InterruptedException e) { + throw new HyracksDataException("RightOperator interrupted exceptrion", e); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + locks.getLock(partition).lock(); + if (first) { + state.status.dataRight(); + first = false; + } + try { + while (state.status.loadRightFrame == false) { + // Wait for the state to request right frame. + locks.getRight(partition).await(); + }; + state.joiner.setRightFrame(buffer); + locks.getLeft(partition).signal(); + } catch (InterruptedException e) { + throw new HyracksDataException("RightOperator interrupted exceptrion", e); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void fail() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.failed = true; + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void close() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.status.closeRight(); + } finally { + locks.getLock(partition).unlock(); + } + } + }; + } + } + } + +} \ No newline at end of file diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoiner.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoiner.java new file mode 100644 index 0000000..b5cdd19 --- /dev/null +++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoiner.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators.interval; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.asterix.runtime.operators.interval.SortMergeIntervalStatus.BranchStatus; +import org.apache.asterix.runtime.operators.interval.SortMergeIntervalStatus.RunFileStatus; +import org.apache.hyracks.api.comm.FixedSizeFrame; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.io.RunFileReader; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.dataflow.std.sort.buffermanager.IFramePool; +import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameTupleBufferAccessor; +import org.apache.hyracks.dataflow.std.sort.buffermanager.ITupleBufferManager; +import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFramePool; +import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableTupleMemoryManager; +import org.apache.hyracks.dataflow.std.structures.TuplePointer; + +public class SortMergeIntervalJoiner { + + private static final int MEMORY_INDEX = -1; + + private final FrameTupleAccessor accessorLeft; + private FrameTupleAccessor accessorRight; + + private SortMergeIntervalJoinLocks locks; + private SortMergeIntervalStatus status; + + private final IFrameWriter writer; + + private ByteBuffer leftBuffer; + private ByteBuffer rightBuffer; + private int leftTupleIndex; + private int rightRunFileTupleIndex; + private int rightBufferTupleIndex; + + private final ITupleBufferManager bufferManager; + private final List<TuplePointer> memoryTuples; + private final IFrameTupleBufferAccessor memoryAccessor; + + private final IFrame runFileBuffer; + private final FrameTupleAppender runFileAppender; + private final RunFileWriter runFileWriter; + private RunFileReader runFileReader; + private IFrameTupleAccessor runFileAccessor; + + private final FrameTupleAppender resultAppender; + + private final FrameTuplePairComparator comparator; + + private final int partition; + + public SortMergeIntervalJoiner(IHyracksTaskContext ctx, int memorySize, int partition, + SortMergeIntervalStatus status, SortMergeIntervalJoinLocks locks, FrameTuplePairComparator comparator, + IFrameWriter writer, RecordDescriptor leftRd) throws HyracksDataException { + this.partition = partition; + this.status = status; + this.locks = locks; + this.writer = writer; + this.comparator = comparator; + + accessorLeft = new FrameTupleAccessor(leftRd); + + // Memory + IFramePool framePool = new VariableFramePool(ctx, (memorySize - 1) * ctx.getInitialFrameSize()); + bufferManager = new VariableTupleMemoryManager(framePool, leftRd); + memoryTuples = new ArrayList<TuplePointer>(); + memoryAccessor = bufferManager.getFrameTupleAccessor(); + + // Run File and frame cache + FileReference file = ctx.getJobletContext() + .createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString()); + runFileWriter = new RunFileWriter(file, ctx.getIOManager()); + runFileWriter.open(); + runFileBuffer = new FixedSizeFrame(ctx.allocateFrame(ctx.getInitialFrameSize())); + runFileAppender = new FrameTupleAppender(new VSizeFrame(ctx)); + + // Result + resultAppender = new FrameTupleAppender(new VSizeFrame(ctx)); + } + + private boolean addToMemory(IFrameTupleAccessor accessor, int idx) throws HyracksDataException { + TuplePointer tuplePointer = new TuplePointer(); + if (bufferManager.insertTuple(accessor, idx, tuplePointer)) { + memoryTuples.add(tuplePointer); + return true; + } + return false; + } + + private void addToResult(IFrameTupleAccessor accessor1, int index1, IFrameTupleAccessor accessor2, int index2) + throws HyracksDataException { + FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, index1, accessor2, index2); + } + + private void addToRunFile(IFrameTupleAccessor accessor, int idx) throws HyracksDataException { + if (!runFileAppender.append(accessor, idx)) { + runFileAppender.flush(runFileWriter, true); + runFileAppender.append(accessor, idx); + } + } + + private void openFromRunFile() throws HyracksDataException { + status.runFileStatus = RunFileStatus.READING; + + // Create reader + runFileReader = runFileWriter.createReader(); + runFileReader.open(); + rightRunFileTupleIndex = 0; + + // Load first frame + runFileReader.nextFrame(runFileBuffer); + accessorRight.reset(runFileBuffer.getBuffer()); + } + + private void closeFromRunFile() throws HyracksDataException { + status.runFileStatus = RunFileStatus.NOT_USED; + runFileReader.close(); + } + + private void flushMemory() throws HyracksDataException { + bufferManager.reset(); + memoryTuples.clear(); + } + + private void incrementLeftTuple() { + leftTupleIndex++; + } + + private int getRightTupleIndex() throws HyracksDataException { + if (status.runFileStatus == RunFileStatus.READING) { + return rightRunFileTupleIndex; + } else { + return rightBufferTupleIndex; + } + } + + private void incrementRightTuple() throws HyracksDataException { + if (status.runFileStatus == RunFileStatus.READING) { + ++rightRunFileTupleIndex; + } else { + rightBufferTupleIndex++; + } + } + + /** + * Ensures a frame exists for the right branch, either from memory or the run file. + * + * @throws HyracksDataException + */ + private boolean loadRightTuple() throws HyracksDataException { + boolean loaded = true; + if (status.runFileStatus == RunFileStatus.READING) { + if (rightRunFileTupleIndex >= accessorRight.getTupleCount()) { + if (runFileReader.nextFrame(runFileBuffer)) { + accessorRight.reset(runFileBuffer.getBuffer()); + rightRunFileTupleIndex = 0; + } else { + closeFromRunFile(); + return loadRightTuple(); + } + } + } else { + if (rightBufferTupleIndex >= accessorRight.getTupleCount()) { + status.loadRightFrame = true; + locks.getRight(partition).signal(); + try { + while (status.loadRightFrame && status.getRightStatus() == BranchStatus.DATA_PROCESSING) { + locks.getLeft(partition).await(); + } + } catch (InterruptedException e) { + throw new HyracksDataException( + "SortMergeIntervalJoin interrupted exception while attempting to load right tuple", e); + } + status.loadRightFrame = false; + loaded = (rightBufferTupleIndex == 0); + if (!loaded) { + status.rightHasMore = false; + } + } + } + return loaded; + } + + /** + * Ensures a frame exists for the right branch, either from memory or the run file. + * + * @throws HyracksDataException + */ + private boolean loadLeftTuple() throws HyracksDataException { + if (status.getLeftStatus() == BranchStatus.DATA_PROCESSING && leftTupleIndex >= accessorLeft.getTupleCount()) { + return false; + } + return true; + } + + // memory management + private boolean memoryHasTuples() { + return bufferManager.getNumTuples() > 0; + } + + public void processMerge() throws HyracksDataException { + // Ensure right tuple loaded into accessorRight + while (loadRightTuple() && status.rightHasMore) { + // ********************* + // Left side from memory + // ********************* + if (status.reloadingLeftFrame) { + // Skip the right frame memory processing. + status.reloadingLeftFrame = false; + } else { + if (status.runFileStatus == RunFileStatus.WRITING) { + // Write right tuple to run file + addToRunFile(accessorRight, getRightTupleIndex()); + } + + for (Iterator<TuplePointer> memoryIterator = memoryTuples.iterator(); memoryIterator.hasNext();) { + TuplePointer tp = memoryIterator.next(); + memoryAccessor.reset(tp); + int c = comparator.compare(memoryAccessor, MEMORY_INDEX, accessorRight, getRightTupleIndex()); + if (c < 0) { + // remove from memory + bufferManager.deleteTuple(tp); + memoryIterator.remove(); + } + if (c == 0) { + // add to result + addToResult(memoryAccessor, MEMORY_INDEX, accessorRight, getRightTupleIndex()); + } + } + + if (!memoryHasTuples() && status.runFileStatus == RunFileStatus.WRITING) { + // Memory is empty and we can start processing the run file. + openFromRunFile(); + flushMemory(); + } + } + + // ********************* + // Left side from stream + // ********************* + if (status.runFileStatus == RunFileStatus.NOT_USED && status.leftHasMore) { + int c = comparator.compare(accessorLeft, leftTupleIndex, accessorRight, getRightTupleIndex()); + while (c <= 0) { + if (c == 0) { + // add to result + addToResult(accessorLeft, leftTupleIndex, accessorRight, getRightTupleIndex()); + // append to memory + if (!addToMemory(accessorLeft, leftTupleIndex)) { + // go to log saving state + status.runFileStatus = RunFileStatus.WRITING; + // write right tuple to run file + addToRunFile(accessorRight, getRightTupleIndex()); + // break (do not increment left tuple) + break; + } + } + incrementLeftTuple(); + if (!loadLeftTuple()) { + return; + } + c = comparator.compare(accessorLeft, leftTupleIndex, accessorRight, getRightTupleIndex()); + } + } + incrementRightTuple(); + } + } + + public void setLeftFrame(ByteBuffer buffer) { + leftBuffer = buffer; + accessorLeft.reset(leftBuffer); + leftTupleIndex = 0; + } + + public void setRightFrame(ByteBuffer buffer) { + rightBuffer = buffer; + accessorRight.reset(rightBuffer); + rightBufferTupleIndex = 0; + } + + public void setRightRecordDescriptor(RecordDescriptor rightRd) { + accessorRight = new FrameTupleAccessor(rightRd); + } +} diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalStatus.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalStatus.java new file mode 100644 index 0000000..c60ba94 --- /dev/null +++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalStatus.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators.interval; + +import java.io.Serializable; + +public class SortMergeIntervalStatus implements Serializable { + private static final long serialVersionUID = 1L; + + public enum BranchStatus { + UNKNOWN, + OPENED, + DATA_PROCESSING, + CLOSED, + } + + public enum RunFileStatus { + NOT_USED, + WRITING, + READING, + } + + public boolean reloadingLeftFrame = false; + public boolean loadRightFrame = false; + + public boolean leftHasMore = true; + public boolean rightHasMore = true; + + private BranchStatus leftStatus = BranchStatus.UNKNOWN; + private BranchStatus rightStatus = BranchStatus.UNKNOWN; + + public RunFileStatus runFileStatus = RunFileStatus.NOT_USED; + + public SortMergeIntervalStatus() { + } + + public BranchStatus getLeftStatus() { + return leftStatus; + } + + public BranchStatus getRightStatus() { + return rightStatus; + } + + public void openLeft() { + leftStatus = BranchStatus.OPENED; + } + + public void openRight() { + rightStatus = BranchStatus.OPENED; + } + + public void dataLeft() { + leftStatus = BranchStatus.DATA_PROCESSING; + } + + public void dataRight() { + rightStatus = BranchStatus.DATA_PROCESSING; + } + + public void closeLeft() { + leftStatus = BranchStatus.CLOSED; + } + + public void closeRight() { + rightStatus = BranchStatus.CLOSED; + } + +} diff --git a/pom.xml b/pom.xml index 8d11476..6a86cf5 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ ! "License"); you may not use this file except in compliance ! with the License. You may obtain a copy of the License at ! - ! http://www.apache.org/licenses/LICENSE-2.0 + ! http://www.apache.org/licenses/LICENSE-2.0 ! ! Unless required by applicable law or agreed to in writing, ! software distributed under the License is distributed on an @@ -55,9 +55,9 @@ <invalid.tests>**/DmlTest.java</invalid.tests> <global.test.includes>**/*TestSuite.java,**/*Test.java,${execution.tests}</global.test.includes> <global.test.excludes>${optimizer.tests},${metadata.tests},${invalid.tests},${repeated.tests}</global.test.excludes> - <!-- Versions under dependencymanagement or used in many projects via properties --> - <algebricks.version>0.2.16-incubating</algebricks.version> - <hyracks.version>0.2.16-incubating</hyracks.version> + <!-- Versions under dependency management or used in many projects via properties --> + <algebricks.version>0.2.16-SNAPSHOT</algebricks.version> + <hyracks.version>0.2.16-SNAPSHOT</hyracks.version> <hadoop.version>2.2.0</hadoop.version> <junit.version>4.11</junit.version> <commons.io.version>2.4</commons.io.version> @@ -309,6 +309,15 @@ <id>algebricks-snapshots</id> <url>http://obelix.ics.uci.edu/nexus/content/repositories/algebricks-snapshots/</url> </repository> + <repository> + <snapshots> + <enabled>true</enabled> + <updatePolicy>always</updatePolicy> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + <id>apache-staging</id> + <url>https://repository.apache.org/content/repositories/orgapacheasterix-1005/</url> + </repository> </repositories> <dependencyManagement> <dependencies> -- To view, visit https://asterix-gerrit.ics.uci.edu/424 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ie6bca14b81c41859f0f89cdc1fc95efd393e79f3 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Preston Carman <[email protected]>
