added more to the interval hint
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fd84e345 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fd84e345 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fd84e345 Branch: refs/heads/ecarm002/interval_join_merge Commit: fd84e345ec29f9b85f32132eef1f6e7f98941866 Parents: 0f533e8 Author: Preston Carman <prest...@apache.org> Authored: Wed Jul 6 18:15:43 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Wed Jul 6 18:15:43 2016 -0700 ---------------------------------------------------------------------- .../IntervalPartitionJoinPOperator.java | 20 +- .../rules/IntervalSplitPartitioningRule.java | 1 - .../asterix/optimizer/rules/util/JoinUtils.java | 21 +- .../interval_overlapping.11.query.aql | 2 +- .../interval_overlapping.12.query.aql | 2 +- .../IntervalJoinExpressionAnnotation.java | 65 +- ...econdaryIndexSearchExpressionAnnotation.java | 15 +- .../asterix-lang-aql/src/main/javacc/AQL.jj | 4 +- .../IntervalPartitionJoin.java | 649 ------------------- ...IntervalPartitionJoinOperatorDescriptor.java | 16 +- .../IntervalPartitionJoiner.java | 649 +++++++++++++++++++ .../IntervalPartitionUtil.java | 26 +- .../AbstractExpressionAnnotation.java | 35 + .../ExpressionAnnotationNoCopyImpl.java | 14 +- .../IndexedNLJoinExpressionAnnotation.java | 14 +- 15 files changed, 788 insertions(+), 745 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java index 414d0b4..1eff2aa 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java @@ -33,18 +33,18 @@ import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; public class IntervalPartitionJoinPOperator extends AbstractIntervalJoinPOperator { private final int memSizeInFrames; - private final int probeTupleCount; - private final int probeMaxDuration; - private final int buildTupleCount; - private final int buildMaxDuration; + private final long probeTupleCount; + private final long probeMaxDuration; + private final long buildTupleCount; + private final long buildMaxDuration; private final int avgTuplesInFrame; private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinPOperator.class.getName()); public IntervalPartitionJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, - int memSizeInFrames, int buildTupleCount, int probeTupleCount, int buildMaxDuration, int probeMaxDuration, - int avgTuplesInFrame, IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) { + int memSizeInFrames, long buildTupleCount, long probeTupleCount, long buildMaxDuration, + long probeMaxDuration, int avgTuplesInFrame, IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) { super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities, mjcf, rangeMap); this.memSizeInFrames = memSizeInFrames; this.buildTupleCount = buildTupleCount; @@ -62,19 +62,19 @@ public class IntervalPartitionJoinPOperator extends AbstractIntervalJoinPOperato + "."); } - public int getProbeTupleCount() { + public long getProbeTupleCount() { return probeTupleCount; } - public int getProbeMaxDuration() { + public long getProbeMaxDuration() { return probeMaxDuration; } - public int getBuildTupleCount() { + public long getBuildTupleCount() { return buildTupleCount; } - public int getBuildMaxDuration() { + public long getBuildMaxDuration() { return buildMaxDuration; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java index 2772e68..629606c 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java @@ -370,7 +370,6 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { flags[i] = true; } ReplicateOperator splitOperator = new ReplicateOperator(flags.length, flags); - // ReplicatePOperator splitPOperator = new ReplicatePOperator(); IntervalLocalRangeSplitterPOperator splitPOperator = new IntervalLocalRangeSplitterPOperator(joinKeyLogicalVars, rangeMap); splitOperator.setPhysicalOperator(splitPOperator); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java index b2f010c..14b7aa6 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java @@ -27,6 +27,8 @@ import java.util.logging.Logger; import org.apache.asterix.algebra.operators.physical.IntervalIndexJoinPOperator; import org.apache.asterix.algebra.operators.physical.IntervalPartitionJoinPOperator; import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation; +import org.apache.asterix.common.annotations.JoinIntervalMaxDurationExpressionAnnotation; +import org.apache.asterix.common.annotations.JoinRecordCountsExpressionAnnotation; import org.apache.asterix.om.functions.AsterixBuiltinFunctions; import org.apache.asterix.runtime.operators.joins.AfterIntervalMergeJoinCheckerFactory; import org.apache.asterix.runtime.operators.joins.BeforeIntervalMergeJoinCheckerFactory; @@ -89,7 +91,7 @@ public class JoinUtils { } else if (ijea.isPartitionJoin()) { // Overlapping Interval Partition. LOGGER.fine("Interval Join - Cluster Parititioning"); - setIntervalPartitionJoinOp(op, fi, sideLeft, sideRight, ijea.getRangeMap(), context); + setIntervalPartitionJoinOp(op, fi, sideLeft, sideRight, ijea.getRangeMap(), ijea, context); } else if (ijea.isSpatialJoin()) { // Spatial Partition. LOGGER.fine("Interval Join - Spatial Partitioning"); @@ -112,6 +114,7 @@ public class JoinUtils { return null; } + private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi, List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IRangeMap rangeMap, IOptimizationContext context) { @@ -122,12 +125,20 @@ public class JoinUtils { private static void setIntervalPartitionJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi, List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IRangeMap rangeMap, - IOptimizationContext context) { + IntervalJoinExpressionAnnotation ijea, IOptimizationContext context) { + long leftCount = ijea.getLeftRecordCount() > 0 ? ijea.getLeftRecordCount() + : getCardinality(sideLeft, context); + long rightCount = ijea.getRightRecordCount() > 0 ? ijea.getRightRecordCount() + : getCardinality(sideRight, context); + long leftMaxDuration = ijea.getLeftMaxDuration() > 0 ? ijea.getLeftMaxDuration() + : getMaxDuration(sideLeft, context); + long rightMaxDuration = ijea.getRightMaxDuration() > 0 ? ijea.getRightMaxDuration() + : getMaxDuration(sideRight, context); + IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, rangeMap); op.setPhysicalOperator(new IntervalPartitionJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, - sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), - getCardinality(sideLeft, context), getCardinality(sideRight, context), - getMaxDuration(sideLeft, context), getMaxDuration(sideRight, context), + sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), leftCount, + rightCount, leftMaxDuration, rightMaxDuration, context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(), mjcf, rangeMap)); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql index 6222c86..1fe23da 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql @@ -26,7 +26,7 @@ use dataverse TinyCollege; for $f in dataset Staff for $d in dataset Students -where /*+ interval-partition-join [10000, 11000, 12000, 14000, 15000] */ interval-overlapping($d.attendance, $f.employment) +where /*+ interval-partition-join [10000,11000,12000,14000,15000] 7 7 400 400 */ interval-overlapping($d.attendance, $f.employment) /*+ range ["F", "L", "R"] */ order by $f.name, $d.name return { "staff" : $f.name, "student" : $d.name } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql index 337221d..2057130 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql @@ -26,7 +26,7 @@ use dataverse TinyCollege; for $f in dataset Staff for $d in dataset Students -where /*+ interval-partition-join [10000, 11000, 12000, 14000, 15000] */ interval-overlapping($f.employment, $d.attendance) +where /*+ interval-partition-join [10000,11000,12000,14000,15000] 7 7 400 400 */ interval-overlapping($f.employment, $d.attendance) /*+ range ["F", "L", "R"] */ order by $f.name, $d.name return { "staff" : $f.name, "student" : $d.name } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java index 342b9e8..f2f325d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java @@ -18,10 +18,11 @@ */ package org.apache.asterix.common.annotations; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractExpressionAnnotation; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation; import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; -public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation { +public class IntervalJoinExpressionAnnotation extends AbstractExpressionAnnotation { private static final String RAW_HINT_STRING = "interval-raw-join"; private static final String PARTITION_HINT_STRING = "interval-partition-join"; @@ -30,19 +31,13 @@ public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation { private static final String INDEX_HINT_STRING = "interval-index-join"; public static final IntervalJoinExpressionAnnotation INSTANCE = new IntervalJoinExpressionAnnotation(); - private Object object; - private IRangeMap map; - private String joinType; + private IRangeMap map = null; + private String joinType = null; + private long leftMaxDuration = -1; + private long rightMaxDuration = -1; + private long leftRecordCount = -1; + private long rightRecordCount = -1; - @Override - public Object getObject() { - return object; - } - - @Override - public void setObject(Object object) { - this.object = object; - } @Override public IExpressionAnnotation copy() { @@ -51,15 +46,25 @@ public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation { return clone; } - public void setRangeMap(IRangeMap map) { - this.map = map; + @Override + public void setObject(Object object) { + super.setObject(object); + parseHint(); } - public IRangeMap getRangeMap() { - return map; + private void parseHint() { + String[] args = ((String) object).split(" "); + setJoinType(args[0]); + + if (joinType.equals(PARTITION_HINT_STRING) && args.length == 6) { + leftRecordCount = Long.valueOf(args[2]); + rightRecordCount = Long.valueOf(args[3]); + leftMaxDuration = Long.valueOf(args[4]); + rightMaxDuration = Long.valueOf(args[5]); + } } - public void setJoinType(String hint) { + private void setJoinType(String hint) { if (hint.startsWith(RAW_HINT_STRING)) { joinType = RAW_HINT_STRING; } else if (hint.startsWith(PARTITION_HINT_STRING)) { @@ -73,6 +78,30 @@ public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation { } } + public long getLeftMaxDuration() { + return leftMaxDuration; + } + + public long getRightMaxDuration() { + return rightMaxDuration; + } + + public long getLeftRecordCount() { + return leftRecordCount; + } + + public long getRightRecordCount() { + return rightRecordCount; + } + + public void setRangeMap(IRangeMap map) { + this.map = map; + } + + public IRangeMap getRangeMap() { + return map; + } + public String getRangeType() { return joinType; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java index e1dd1cb..de1e1fa 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java @@ -18,25 +18,14 @@ */ package org.apache.asterix.common.annotations; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractExpressionAnnotation; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation; -public class SkipSecondaryIndexSearchExpressionAnnotation implements IExpressionAnnotation { +public class SkipSecondaryIndexSearchExpressionAnnotation extends AbstractExpressionAnnotation { public static final String HINT_STRING = "skip-index"; public static final SkipSecondaryIndexSearchExpressionAnnotation INSTANCE = new SkipSecondaryIndexSearchExpressionAnnotation(); - private Object object; - - @Override - public Object getObject() { - return object; - } - - @Override - public void setObject(Object object) { - this.object = object; - } - @Override public IExpressionAnnotation copy() { SkipSecondaryIndexSearchExpressionAnnotation clone = new SkipSecondaryIndexSearchExpressionAnnotation(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj index 80ddeef..bbba41d 100644 --- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj +++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj @@ -2112,10 +2112,10 @@ Expression FunctionCallExpr() throws ParseException: if (IntervalJoinExpressionAnnotation.isIntervalJoinHint(hint)) { IntervalJoinExpressionAnnotation ijea = IntervalJoinExpressionAnnotation.INSTANCE; ijea.setObject(hint); - ijea.setJoinType(hint); try { if (ijea.hasRangeArgument()) { - ijea.setRangeMap(RangeMapBuilder.parseHint(hint.substring(IntervalJoinExpressionAnnotation.getHintLength(hint)))); + String rangeHint = hint.substring(IntervalJoinExpressionAnnotation.getHintLength(hint), hint.indexOf(']', 0) + 1); + ijea.setRangeMap(RangeMapBuilder.parseHint(rangeHint)); } } catch (AsterixException e) { {if (true) throw new ParseException(e.getMessage());} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java deleted file mode 100644 index 1bccbd2..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java +++ /dev/null @@ -1,649 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.operators.joins.intervalpartition; - -import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map.Entry; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; -import org.apache.commons.io.FileUtils; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.io.RunFileReader; -import org.apache.hyracks.dataflow.common.io.RunFileWriter; -import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedMemoryConstrain; -import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager; -import org.apache.hyracks.dataflow.std.structures.TuplePointer; - -/** - * This class mainly applies one level of HHJ on a pair of - * relations. It is always called by the descriptor. - */ -public class IntervalPartitionJoin { - - // Used for special probe BigObject which can not be held into the Join memory - private FrameTupleAppender bigProbeFrameAppender; - - enum SIDE { - BUILD, - PROBE - } - - private IHyracksTaskContext ctx; - - private final String buildRelName; - private final String probeRelName; - - private final ITuplePartitionComputer buildHpc; - private final ITuplePartitionComputer probeHpc; - - private final RecordDescriptor buildRd; - private final RecordDescriptor probeRd; - - private RunFileWriter[] buildRFWriters; //writing spilled build partitions - private RunFileWriter[] probeRFWriters; //writing spilled probe partitions - - private final int memForJoin; - private final int k; - private final int numOfPartitions; - private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions - - private VPartitionTupleBufferManager buildBufferManager; - private VPartitionTupleBufferManager probeBufferManager; - - private final FrameTupleAccessor accessorBuild; - private final FrameTupleAccessor accessorProbe; - - private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoin.class.getName()); - - // stats information - private IntervalPartitionJoinData ipjd; - - private IFrame reloadBuffer; - private TuplePointer tempPtr = new TuplePointer(); - - private IIntervalMergeJoinChecker imjc; - - public IntervalPartitionJoin(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions, - String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, - RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) { - this.ctx = ctx; - this.memForJoin = memForJoin; - this.k = k; - this.buildRd = buildRd; - this.probeRd = probeRd; - this.buildHpc = buildHpc; - this.probeHpc = probeHpc; - this.imjc = imjc; - this.buildRelName = buildRelName; - this.probeRelName = probeRelName; - - this.numOfPartitions = numOfPartitions; - this.buildRFWriters = new RunFileWriter[numOfPartitions]; - this.probeRFWriters = new RunFileWriter[numOfPartitions]; - this.inMemJoiner = new InMemoryIntervalPartitionJoin[numOfPartitions]; - - this.accessorBuild = new FrameTupleAccessor(buildRd); - this.accessorProbe = new FrameTupleAccessor(probeRd); - - ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions); - } - - public void initBuild() throws HyracksDataException { - buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions, - memForJoin * ctx.getInitialFrameSize()); - } - - private IPartitionedMemoryConstrain getPartitionMemoryConstrain() { - return VPartitionTupleBufferManager.NO_CONSTRAIN; - } - - public void build(ByteBuffer buffer) throws HyracksDataException { - accessorBuild.reset(buffer); - int tupleCount = accessorBuild.getTupleCount(); - - for (int i = 0; i < tupleCount; ++i) { - int pid = buildHpc.partition(accessorBuild, i, k); - processTuple(i, pid); - ipjd.buildIncrementCount(pid); - } - } - - public void closeBuild() throws HyracksDataException { - int inMemoryPartitions = 0; - int totalBuildPartitions = 0; - flushAndClearBuildSpilledPartition(); - - // Trying to bring back as many spilled partitions as possible, making them resident - bringBackSpilledPartitionIfHasMoreMemory(); - - // Update build partition join map based on partitions with actual data. - for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) { - if (ipjd.buildGetCount(i) == 0) { - ipjd.buildRemoveFromJoin(i); - } else if (ipjd.buildGetCount(i) > 0) { - // Set up build memory for processing joins for partitions in memory. - createInMemoryJoiner(i); - inMemoryPartitions++; - totalBuildPartitions += ipjd.buildGetCount(i); - } - } - - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("IntervalPartitionJoin has closed the build phase. Total tuples: " + totalBuildPartitions - + ", In memory partitions: " + inMemoryPartitions + ", Spilled partitions: " - + ipjd.buildGetSpilledCount()); - } - } - - private void processTuple(int tid, int pid) throws HyracksDataException { - while (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { - int victimPartition = selectPartitionToSpill(); - if (victimPartition < 0) { - throw new HyracksDataException( - "No more space left in the memory buffer, please give join more memory budgets."); - } - spillPartition(victimPartition); - } - } - - private int selectPartitionToSpill() { - int partitionToSpill = selectLargestSpilledPartition(); - int maxToSpillPartSize = 0; - if (partitionToSpill < 0 || (maxToSpillPartSize = buildBufferManager.getPhysicalSize(partitionToSpill)) == ctx - .getInitialFrameSize()) { - int partitionInMem = selectNextInMemoryPartitionToSpill(); - if (partitionInMem >= 0 && buildBufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) { - partitionToSpill = partitionInMem; - } - } - return partitionToSpill; - } - - /** - * Select next partition to spill. The partitions have been numbered in the order they should be spilled. - * - * @return - */ - private int selectNextInMemoryPartitionToSpill() { - for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) { - if (!ipjd.buildIsSpilled(i) && buildBufferManager.getPhysicalSize(i) > 0) { - return i; - } - } - return -1; - } - - private int selectLargestSpilledPartition() { - int pid = -1; - int max = 0; - for (int i = ipjd.buildNextSpilled(0); i >= 0; i = ipjd.buildNextSpilled(i + 1)) { - int partSize = buildBufferManager.getPhysicalSize(i); - if (partSize > max) { - max = partSize; - pid = i; - } - } - return pid; - } - - private void spillPartition(int pid) throws HyracksDataException { - RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD); - buildBufferManager.flushPartition(pid, writer); - buildBufferManager.clearPartition(pid); - ipjd.buildSpill(pid); - } - - private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException { - RunFileWriter[] runFileWriters = null; - String refName = null; - switch (whichSide) { - case BUILD: - runFileWriters = buildRFWriters; - refName = buildRelName; - break; - case PROBE: - refName = probeRelName; - runFileWriters = probeRFWriters; - break; - default: - } - RunFileWriter writer = runFileWriters[pid]; - if (writer == null) { - FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName); - writer = new RunFileWriter(file, ctx.getIOManager()); - writer.open(); - runFileWriters[pid] = writer; - } - return writer; - } - - public void clearBuildMemory() throws HyracksDataException { - for (int pid = 0; pid < numOfPartitions; ++pid) { - if (buildBufferManager.getNumTuples(pid) > 0) { - buildBufferManager.clearPartition(pid); - ipjd.buildRemoveFromJoin(pid); - } - } - } - - private void flushAndClearBuildSpilledPartition() throws HyracksDataException { - for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) { - if (buildBufferManager.getNumTuples(pid) > 0) { - buildBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD)); - buildBufferManager.clearPartition(pid); - buildRFWriters[pid].close(); - } - } - } - - private void flushAndClearProbeSpilledPartition() throws HyracksDataException { - for (int pid = 0; pid < numOfPartitions; ++pid) { - if (probeBufferManager.getNumTuples(pid) > 0) { - probeBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE)); - probeBufferManager.clearPartition(pid); - probeRFWriters[pid].close(); - } - } - } - - private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException { - // we need number of |spilledPartitions| buffers to store the probe data - int freeSpace = (memForJoin - ipjd.buildGetSpilledCount()) * ctx.getInitialFrameSize(); - for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) { - freeSpace -= buildBufferManager.getPhysicalSize(i); - } - - int pid = 0; - while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) { - if (!loadPartitionInMem(pid, buildRFWriters[pid])) { - return; - } - freeSpace -= buildBufferManager.getPhysicalSize(pid); - } - } - - private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException { - RunFileReader r = wr.createDeleteOnCloseReader(); - r.open(); - if (reloadBuffer == null) { - reloadBuffer = new VSizeFrame(ctx); - } - while (r.nextFrame(reloadBuffer)) { - accessorBuild.reset(reloadBuffer.getBuffer()); - for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) { - if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { - // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames - buildBufferManager.clearPartition(pid); - r.close(); - return false; - } - } - } - - r.close(); - ipjd.buildLoad(pid); - buildRFWriters[pid] = null; - return true; - } - - private int selectPartitionsToReload(int freeSpace, int pid) { - for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) { - assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled partition have size 0?"; - if (freeSpace >= buildRFWriters[id].getFileSize()) { - return id; - } - } - return -1; - } - - private void createInMemoryJoiner(int pid) throws HyracksDataException { - this.inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx, - buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd); - } - - private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException { - this.inMemJoiner[pid].closeJoin(writer); - this.inMemJoiner[pid] = null; - } - - public void initProbe() throws HyracksDataException { - int probeMemory = numOfPartitions > memForJoin ? memForJoin : numOfPartitions; - probeBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions, - (probeMemory) * ctx.getInitialFrameSize()); - - probeRFWriters = new RunFileWriter[numOfPartitions]; - } - - public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException { - accessorProbe.reset(buffer); - int tupleCount = accessorProbe.getTupleCount(); - - for (int i = 0; i < tupleCount; ++i) { - int pid = probeHpc.partition(accessorProbe, i, k); - if (!ipjd.hasProbeJoinMap(pid)) { - // Set probe join map - ipjd.setProbeJoinMap(pid, - IntervalPartitionUtil.getProbeJoinPartitions(pid, ipjd.buildPSizeInTups, imjc, k)); - } - - // Tuple has potential match from build phase - if (!ipjd.isProbeJoinMapEmpty(pid)) { - if (ipjd.probeHasSpilled(pid)) { - // pid is Spilled - while (!probeBufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) { - int victim = pid; - if (probeBufferManager.getNumTuples(pid) == 0) { - // current pid is empty, choose the biggest one - victim = selectLargestSpilledPartition(); - } - if (victim < 0) { - // current tuple is too big for all the free space - flushBigProbeObjectToDisk(pid, accessorProbe, i); - break; - } - RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE); - probeBufferManager.flushPartition(victim, runFileWriter); - probeBufferManager.clearPartition(victim); - } - } - for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) { - // pid has join partitions that are Resident - int j = pidIterator.next(); - if (inMemJoiner[j] != null) { - inMemJoiner[j].join(accessorProbe, i, writer); - } - } - } - ipjd.probeIncrementCount(pid); - } - } - - public void closeProbe(IFrameWriter writer) throws HyracksDataException { - // We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use) - for (int i = 0; i < inMemJoiner.length; ++i) { - if (inMemJoiner[i] != null) { - closeInMemoryJoiner(i, writer); - ipjd.buildLogJoined(i); - } - } - clearBuildMemory(); - flushAndClearProbeSpilledPartition(); - probeBufferManager.close(); - probeBufferManager = null; - } - - private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i) - throws HyracksDataException { - if (bigProbeFrameAppender == null) { - bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx)); - } - RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE); - if (!bigProbeFrameAppender.append(accessorProbe, i)) { - throw new HyracksDataException("The given tuple is too big"); - } - bigProbeFrameAppender.write(runFileWriter, true); - } - - public RunFileReader getBuildRFReader(int pid) throws HyracksDataException { - return (buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader(); - } - - public RunFileReader getProbeRFReader(int pid) throws HyracksDataException { - return (probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader(); - } - - public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException { - LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap; - if (reloadBuffer == null) { - reloadBuffer = new VSizeFrame(ctx); - } - HashSet<Integer> inMemory = new HashSet<>(); - while (ipjd.buildGetSpilledCount() > 0) { - // Load back spilled build partitions. - // TODO only load partition required for spill join. Consider both sides. - bringBackSpilledPartitionIfHasMoreMemory(); - - probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap(); - - // Create in memory joiners. - for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd - .buildNextInMemoryWithResults(pid + 1)) { - createInMemoryJoiner(pid); - inMemory.add(pid); - } - - // Join all build partitions with disk probe partitions. - for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) { - if (ipjd.probeGetCount(entry.getKey()) > 0 && probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) { - RunFileReader pReader = getProbeRFReader(entry.getKey()); - pReader.open(); - while (pReader.nextFrame(reloadBuffer)) { - accessorProbe.reset(reloadBuffer.getBuffer()); - for (int i = 0; i < accessorProbe.getTupleCount(); ++i) { - // Tuple has potential match from build phase - for (Integer j : probeInMemoryJoinMap.get(entry.getKey())) { - // j has join partitions that are Resident - if (inMemJoiner[j] != null) { - inMemJoiner[j].join(accessorProbe, i, writer); - } - } - } - } - pReader.close(); - } - } - - // Clean up build memory. - for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd - .buildNextInMemoryWithResults(pid + 1)) { - closeInMemoryJoiner(pid, writer); - ipjd.buildLogJoined(pid); - } - inMemory.clear(); - clearBuildMemory(); - } - } - - class IntervalPartitionJoinData { - private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap; - - private int[] buildPSizeInTups; - private int[] probePSizeInTups; - - private BitSet buildJoinedCompleted; //0=waiting, 1=joined - private BitSet buildSpilledStatus; //0=resident, 1=spilled - private BitSet buildInMemoryStatus; //0=unknown, 1=resident - private BitSet probeSpilledStatus; //0=resident, 1=spilled - - public IntervalPartitionJoinData(int k, IIntervalMergeJoinChecker imjc, int numberOfPartitions) { - probeJoinMap = new LinkedHashMap<>(); - - buildPSizeInTups = new int[numberOfPartitions]; - probePSizeInTups = new int[numberOfPartitions]; - - buildJoinedCompleted = new BitSet(numberOfPartitions); - buildInMemoryStatus = new BitSet(numberOfPartitions); - buildSpilledStatus = new BitSet(numberOfPartitions); - probeSpilledStatus = new BitSet(numberOfPartitions); - } - - public LinkedHashMap<Integer, LinkedHashSet<Integer>> probeGetInMemoryJoinMap() { - return IntervalPartitionUtil.getInMemorySpillJoinMap(probeJoinMap, buildInMemoryStatus, probeSpilledStatus); - } - - public boolean hasProbeJoinMap(int pid) { - return probeJoinMap.containsKey(pid); - } - - public boolean isProbeJoinMapEmpty(int pid) { - return probeJoinMap.get(pid).isEmpty(); - } - - public Iterator<Integer> getProbeJoinMap(int pid) { - return probeJoinMap.get(pid).iterator(); - } - - public void setProbeJoinMap(int pid, LinkedHashSet<Integer> map) { - probeJoinMap.put(new Integer(pid), map); - for (Integer i : map) { - if (buildIsSpilled(i)) { - // Build join partition has spilled. Now spill the probe also. - probeSpilledStatus.set(pid); - } - } - } - - public void buildIncrementCount(int pid) { - buildInMemoryStatus.set(pid); - buildPSizeInTups[pid]++; - } - - public int buildGetCount(int pid) { - return buildPSizeInTups[pid]; - } - - public void buildLogJoined(int pid) { - buildSpilledStatus.clear(pid); - buildJoinedCompleted.set(pid); - } - - public void buildRemoveFromJoin(int pid) { - buildSpilledStatus.clear(pid); - buildJoinedCompleted.set(pid); - } - - public boolean buildHasBeenJoined(int pid) { - return buildJoinedCompleted.get(pid); - } - - public int buildGetSpilledCount() { - return buildSpilledStatus.cardinality(); - } - - public void buildSpill(int pid) { - buildInMemoryStatus.clear(pid); - buildSpilledStatus.set(pid); - } - - public void buildLoad(int pid) { - buildInMemoryStatus.set(pid); - buildSpilledStatus.clear(pid); - } - - public boolean buildIsSpilled(int pid) { - return buildSpilledStatus.get(pid); - } - - public int buildNextSpilled(int pid) { - return buildSpilledStatus.nextSetBit(pid); - } - - public int buildNextInMemoryWithResults(int pid) { - int nextPid = buildNextInMemory(pid); - do { - if (nextPid < 0 || buildGetCount(nextPid) > 0) { - return nextPid; - } - nextPid = buildNextInMemory(nextPid + 1); - } while (nextPid >= 0); - return -1; - } - - public int buildNextInMemory(int pid) { - int nextPid = buildSpilledStatus.nextClearBit(pid); - if (nextPid >= numOfPartitions) { - return -1; - } - do { - if (!buildHasBeenJoined(nextPid)) { - return nextPid; - } - nextPid = buildSpilledStatus.nextClearBit(nextPid + 1); - } while (nextPid >= 0 && nextPid < numOfPartitions); - return -1; - } - - public void probeIncrementCount(int pid) { - probePSizeInTups[pid]++; - } - - public int probeGetCount(int pid) { - return probePSizeInTups[pid]; - } - - public void probeSpill(int pid) { - probeSpilledStatus.set(pid); - } - - public boolean probeHasSpilled(int pid) { - return probeSpilledStatus.get(pid); - } - - public int buildGetMaxPartitionSize() { - int max = buildPSizeInTups[0]; - for (int i = 1; i < buildPSizeInTups.length; i++) { - if (buildPSizeInTups[i] > max) { - max = buildPSizeInTups[i]; - } - } - return max; - } - - public int probeGetMaxPartitionSize() { - int max = probePSizeInTups[0]; - for (int i = 1; i < probePSizeInTups.length; i++) { - if (probePSizeInTups[i] > max) { - max = probePSizeInTups[i]; - } - } - return max; - } - - } - - public void closeAndDeleteRunFiles() throws HyracksDataException { - for (RunFileWriter rfw : buildRFWriters) { - if (rfw != null) { - FileUtils.deleteQuietly(rfw.getFileReference().getFile()); - } - } - for (RunFileWriter rfw : probeRFWriters) { - if (rfw != null) { - FileUtils.deleteQuietly(rfw.getFileReference().getFile()); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java index 21e07a5..0dd358c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java @@ -57,10 +57,10 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes private final int[] probeKeys; private final int[] buildKeys; - private final int probeTupleCount; - private final int probeMaxDuration; - private final int buildTupleCount; - private final int buildMaxDuration; + private final long probeTupleCount; + private final long probeMaxDuration; + private final long buildTupleCount; + private final long buildMaxDuration; private final int avgTuplesPerFrame; private final int probeKey; private final int buildKey; @@ -69,8 +69,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName()); - public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int leftTupleCount, - int rightTupleCount, int leftMaxDuration, int rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys, + public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, long leftTupleCount, + long rightTupleCount, long leftMaxDuration, long rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf, IRangeMap rangeMap) { super(spec, 2, 1); @@ -108,7 +108,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes } public static class BuildAndPartitionTaskState extends AbstractStateObject { - private IntervalPartitionJoin ipj; + private IntervalPartitionJoiner ipj; private int intervalPartitions; private int partition; private int k; @@ -169,7 +169,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k); state.memoryForJoin = memsize; IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition); - state.ipj = new IntervalPartitionJoin(ctx, state.memoryForJoin, state.k, state.intervalPartitions, + state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions, BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc); state.ipj.initBuild(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java new file mode 100644 index 0000000..5df7b0a --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators.joins.intervalpartition; + +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map.Entry; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; +import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.io.RunFileReader; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedMemoryConstrain; +import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager; +import org.apache.hyracks.dataflow.std.structures.TuplePointer; + +/** + * This class mainly applies one level of HHJ on a pair of + * relations. It is always called by the descriptor. + */ +public class IntervalPartitionJoiner { + + // Used for special probe BigObject which can not be held into the Join memory + private FrameTupleAppender bigProbeFrameAppender; + + enum SIDE { + BUILD, + PROBE + } + + private IHyracksTaskContext ctx; + + private final String buildRelName; + private final String probeRelName; + + private final ITuplePartitionComputer buildHpc; + private final ITuplePartitionComputer probeHpc; + + private final RecordDescriptor buildRd; + private final RecordDescriptor probeRd; + + private RunFileWriter[] buildRFWriters; //writing spilled build partitions + private RunFileWriter[] probeRFWriters; //writing spilled probe partitions + + private final int memForJoin; + private final int k; + private final int numOfPartitions; + private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions + + private VPartitionTupleBufferManager buildBufferManager; + private VPartitionTupleBufferManager probeBufferManager; + + private final FrameTupleAccessor accessorBuild; + private final FrameTupleAccessor accessorProbe; + + private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName()); + + // stats information + private IntervalPartitionJoinData ipjd; + + private IFrame reloadBuffer; + private TuplePointer tempPtr = new TuplePointer(); + + private IIntervalMergeJoinChecker imjc; + + public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions, + String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, + RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) { + this.ctx = ctx; + this.memForJoin = memForJoin; + this.k = k; + this.buildRd = buildRd; + this.probeRd = probeRd; + this.buildHpc = buildHpc; + this.probeHpc = probeHpc; + this.imjc = imjc; + this.buildRelName = buildRelName; + this.probeRelName = probeRelName; + + this.numOfPartitions = numOfPartitions; + this.buildRFWriters = new RunFileWriter[numOfPartitions]; + this.probeRFWriters = new RunFileWriter[numOfPartitions]; + this.inMemJoiner = new InMemoryIntervalPartitionJoin[numOfPartitions]; + + this.accessorBuild = new FrameTupleAccessor(buildRd); + this.accessorProbe = new FrameTupleAccessor(probeRd); + + ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions); + } + + public void initBuild() throws HyracksDataException { + buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions, + memForJoin * ctx.getInitialFrameSize()); + } + + private IPartitionedMemoryConstrain getPartitionMemoryConstrain() { + return VPartitionTupleBufferManager.NO_CONSTRAIN; + } + + public void build(ByteBuffer buffer) throws HyracksDataException { + accessorBuild.reset(buffer); + int tupleCount = accessorBuild.getTupleCount(); + + for (int i = 0; i < tupleCount; ++i) { + int pid = buildHpc.partition(accessorBuild, i, k); + processTuple(i, pid); + ipjd.buildIncrementCount(pid); + } + } + + public void closeBuild() throws HyracksDataException { + int inMemoryPartitions = 0; + int totalBuildPartitions = 0; + flushAndClearBuildSpilledPartition(); + + // Trying to bring back as many spilled partitions as possible, making them resident + bringBackSpilledPartitionIfHasMoreMemory(); + + // Update build partition join map based on partitions with actual data. + for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) { + if (ipjd.buildGetCount(i) == 0) { + ipjd.buildRemoveFromJoin(i); + } else if (ipjd.buildGetCount(i) > 0) { + // Set up build memory for processing joins for partitions in memory. + createInMemoryJoiner(i); + inMemoryPartitions++; + totalBuildPartitions += ipjd.buildGetCount(i); + } + } + + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("IntervalPartitionJoin has closed the build phase. Total tuples: " + totalBuildPartitions + + ", In memory partitions: " + inMemoryPartitions + ", Spilled partitions: " + + ipjd.buildGetSpilledCount()); + } + } + + private void processTuple(int tid, int pid) throws HyracksDataException { + while (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { + int victimPartition = selectPartitionToSpill(); + if (victimPartition < 0) { + throw new HyracksDataException( + "No more space left in the memory buffer, please give join more memory budgets."); + } + spillPartition(victimPartition); + } + } + + private int selectPartitionToSpill() { + int partitionToSpill = selectLargestSpilledPartition(); + int maxToSpillPartSize = 0; + if (partitionToSpill < 0 || (maxToSpillPartSize = buildBufferManager.getPhysicalSize(partitionToSpill)) == ctx + .getInitialFrameSize()) { + int partitionInMem = selectNextInMemoryPartitionToSpill(); + if (partitionInMem >= 0 && buildBufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) { + partitionToSpill = partitionInMem; + } + } + return partitionToSpill; + } + + /** + * Select next partition to spill. The partitions have been numbered in the order they should be spilled. + * + * @return + */ + private int selectNextInMemoryPartitionToSpill() { + for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) { + if (!ipjd.buildIsSpilled(i) && buildBufferManager.getPhysicalSize(i) > 0) { + return i; + } + } + return -1; + } + + private int selectLargestSpilledPartition() { + int pid = -1; + int max = 0; + for (int i = ipjd.buildNextSpilled(0); i >= 0; i = ipjd.buildNextSpilled(i + 1)) { + int partSize = buildBufferManager.getPhysicalSize(i); + if (partSize > max) { + max = partSize; + pid = i; + } + } + return pid; + } + + private void spillPartition(int pid) throws HyracksDataException { + RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD); + buildBufferManager.flushPartition(pid, writer); + buildBufferManager.clearPartition(pid); + ipjd.buildSpill(pid); + } + + private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException { + RunFileWriter[] runFileWriters = null; + String refName = null; + switch (whichSide) { + case BUILD: + runFileWriters = buildRFWriters; + refName = buildRelName; + break; + case PROBE: + refName = probeRelName; + runFileWriters = probeRFWriters; + break; + default: + } + RunFileWriter writer = runFileWriters[pid]; + if (writer == null) { + FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName); + writer = new RunFileWriter(file, ctx.getIOManager()); + writer.open(); + runFileWriters[pid] = writer; + } + return writer; + } + + public void clearBuildMemory() throws HyracksDataException { + for (int pid = 0; pid < numOfPartitions; ++pid) { + if (buildBufferManager.getNumTuples(pid) > 0) { + buildBufferManager.clearPartition(pid); + ipjd.buildRemoveFromJoin(pid); + } + } + } + + private void flushAndClearBuildSpilledPartition() throws HyracksDataException { + for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) { + if (buildBufferManager.getNumTuples(pid) > 0) { + buildBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD)); + buildBufferManager.clearPartition(pid); + buildRFWriters[pid].close(); + } + } + } + + private void flushAndClearProbeSpilledPartition() throws HyracksDataException { + for (int pid = 0; pid < numOfPartitions; ++pid) { + if (probeBufferManager.getNumTuples(pid) > 0) { + probeBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE)); + probeBufferManager.clearPartition(pid); + probeRFWriters[pid].close(); + } + } + } + + private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException { + // we need number of |spilledPartitions| buffers to store the probe data + int freeSpace = (memForJoin - ipjd.buildGetSpilledCount()) * ctx.getInitialFrameSize(); + for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) { + freeSpace -= buildBufferManager.getPhysicalSize(i); + } + + int pid = 0; + while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) { + if (!loadPartitionInMem(pid, buildRFWriters[pid])) { + return; + } + freeSpace -= buildBufferManager.getPhysicalSize(pid); + } + } + + private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException { + RunFileReader r = wr.createDeleteOnCloseReader(); + r.open(); + if (reloadBuffer == null) { + reloadBuffer = new VSizeFrame(ctx); + } + while (r.nextFrame(reloadBuffer)) { + accessorBuild.reset(reloadBuffer.getBuffer()); + for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) { + if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { + // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames + buildBufferManager.clearPartition(pid); + r.close(); + return false; + } + } + } + + r.close(); + ipjd.buildLoad(pid); + buildRFWriters[pid] = null; + return true; + } + + private int selectPartitionsToReload(int freeSpace, int pid) { + for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) { + assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled partition have size 0?"; + if (freeSpace >= buildRFWriters[id].getFileSize()) { + return id; + } + } + return -1; + } + + private void createInMemoryJoiner(int pid) throws HyracksDataException { + this.inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx, + buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd); + } + + private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException { + this.inMemJoiner[pid].closeJoin(writer); + this.inMemJoiner[pid] = null; + } + + public void initProbe() throws HyracksDataException { + int probeMemory = numOfPartitions > memForJoin ? memForJoin : numOfPartitions; + probeBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions, + (probeMemory) * ctx.getInitialFrameSize()); + + probeRFWriters = new RunFileWriter[numOfPartitions]; + } + + public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException { + accessorProbe.reset(buffer); + int tupleCount = accessorProbe.getTupleCount(); + + for (int i = 0; i < tupleCount; ++i) { + int pid = probeHpc.partition(accessorProbe, i, k); + if (!ipjd.hasProbeJoinMap(pid)) { + // Set probe join map + ipjd.setProbeJoinMap(pid, + IntervalPartitionUtil.getProbeJoinPartitions(pid, ipjd.buildPSizeInTups, imjc, k)); + } + + // Tuple has potential match from build phase + if (!ipjd.isProbeJoinMapEmpty(pid)) { + if (ipjd.probeHasSpilled(pid)) { + // pid is Spilled + while (!probeBufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) { + int victim = pid; + if (probeBufferManager.getNumTuples(pid) == 0) { + // current pid is empty, choose the biggest one + victim = selectLargestSpilledPartition(); + } + if (victim < 0) { + // current tuple is too big for all the free space + flushBigProbeObjectToDisk(pid, accessorProbe, i); + break; + } + RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE); + probeBufferManager.flushPartition(victim, runFileWriter); + probeBufferManager.clearPartition(victim); + } + } + for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) { + // pid has join partitions that are Resident + int j = pidIterator.next(); + if (inMemJoiner[j] != null) { + inMemJoiner[j].join(accessorProbe, i, writer); + } + } + } + ipjd.probeIncrementCount(pid); + } + } + + public void closeProbe(IFrameWriter writer) throws HyracksDataException { + // We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use) + for (int i = 0; i < inMemJoiner.length; ++i) { + if (inMemJoiner[i] != null) { + closeInMemoryJoiner(i, writer); + ipjd.buildLogJoined(i); + } + } + clearBuildMemory(); + flushAndClearProbeSpilledPartition(); + probeBufferManager.close(); + probeBufferManager = null; + } + + private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i) + throws HyracksDataException { + if (bigProbeFrameAppender == null) { + bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx)); + } + RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE); + if (!bigProbeFrameAppender.append(accessorProbe, i)) { + throw new HyracksDataException("The given tuple is too big"); + } + bigProbeFrameAppender.write(runFileWriter, true); + } + + public RunFileReader getBuildRFReader(int pid) throws HyracksDataException { + return (buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader(); + } + + public RunFileReader getProbeRFReader(int pid) throws HyracksDataException { + return (probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader(); + } + + public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException { + LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap; + if (reloadBuffer == null) { + reloadBuffer = new VSizeFrame(ctx); + } + HashSet<Integer> inMemory = new HashSet<>(); + while (ipjd.buildGetSpilledCount() > 0) { + // Load back spilled build partitions. + // TODO only load partition required for spill join. Consider both sides. + bringBackSpilledPartitionIfHasMoreMemory(); + + probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap(); + + // Create in memory joiners. + for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd + .buildNextInMemoryWithResults(pid + 1)) { + createInMemoryJoiner(pid); + inMemory.add(pid); + } + + // Join all build partitions with disk probe partitions. + for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) { + if (ipjd.probeGetCount(entry.getKey()) > 0 && probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) { + RunFileReader pReader = getProbeRFReader(entry.getKey()); + pReader.open(); + while (pReader.nextFrame(reloadBuffer)) { + accessorProbe.reset(reloadBuffer.getBuffer()); + for (int i = 0; i < accessorProbe.getTupleCount(); ++i) { + // Tuple has potential match from build phase + for (Integer j : probeInMemoryJoinMap.get(entry.getKey())) { + // j has join partitions that are Resident + if (inMemJoiner[j] != null) { + inMemJoiner[j].join(accessorProbe, i, writer); + } + } + } + } + pReader.close(); + } + } + + // Clean up build memory. + for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd + .buildNextInMemoryWithResults(pid + 1)) { + closeInMemoryJoiner(pid, writer); + ipjd.buildLogJoined(pid); + } + inMemory.clear(); + clearBuildMemory(); + } + } + + class IntervalPartitionJoinData { + private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap; + + private int[] buildPSizeInTups; + private int[] probePSizeInTups; + + private BitSet buildJoinedCompleted; //0=waiting, 1=joined + private BitSet buildSpilledStatus; //0=resident, 1=spilled + private BitSet buildInMemoryStatus; //0=unknown, 1=resident + private BitSet probeSpilledStatus; //0=resident, 1=spilled + + public IntervalPartitionJoinData(int k, IIntervalMergeJoinChecker imjc, int numberOfPartitions) { + probeJoinMap = new LinkedHashMap<>(); + + buildPSizeInTups = new int[numberOfPartitions]; + probePSizeInTups = new int[numberOfPartitions]; + + buildJoinedCompleted = new BitSet(numberOfPartitions); + buildInMemoryStatus = new BitSet(numberOfPartitions); + buildSpilledStatus = new BitSet(numberOfPartitions); + probeSpilledStatus = new BitSet(numberOfPartitions); + } + + public LinkedHashMap<Integer, LinkedHashSet<Integer>> probeGetInMemoryJoinMap() { + return IntervalPartitionUtil.getInMemorySpillJoinMap(probeJoinMap, buildInMemoryStatus, probeSpilledStatus); + } + + public boolean hasProbeJoinMap(int pid) { + return probeJoinMap.containsKey(pid); + } + + public boolean isProbeJoinMapEmpty(int pid) { + return probeJoinMap.get(pid).isEmpty(); + } + + public Iterator<Integer> getProbeJoinMap(int pid) { + return probeJoinMap.get(pid).iterator(); + } + + public void setProbeJoinMap(int pid, LinkedHashSet<Integer> map) { + probeJoinMap.put(new Integer(pid), map); + for (Integer i : map) { + if (buildIsSpilled(i)) { + // Build join partition has spilled. Now spill the probe also. + probeSpilledStatus.set(pid); + } + } + } + + public void buildIncrementCount(int pid) { + buildInMemoryStatus.set(pid); + buildPSizeInTups[pid]++; + } + + public int buildGetCount(int pid) { + return buildPSizeInTups[pid]; + } + + public void buildLogJoined(int pid) { + buildSpilledStatus.clear(pid); + buildJoinedCompleted.set(pid); + } + + public void buildRemoveFromJoin(int pid) { + buildSpilledStatus.clear(pid); + buildJoinedCompleted.set(pid); + } + + public boolean buildHasBeenJoined(int pid) { + return buildJoinedCompleted.get(pid); + } + + public int buildGetSpilledCount() { + return buildSpilledStatus.cardinality(); + } + + public void buildSpill(int pid) { + buildInMemoryStatus.clear(pid); + buildSpilledStatus.set(pid); + } + + public void buildLoad(int pid) { + buildInMemoryStatus.set(pid); + buildSpilledStatus.clear(pid); + } + + public boolean buildIsSpilled(int pid) { + return buildSpilledStatus.get(pid); + } + + public int buildNextSpilled(int pid) { + return buildSpilledStatus.nextSetBit(pid); + } + + public int buildNextInMemoryWithResults(int pid) { + int nextPid = buildNextInMemory(pid); + do { + if (nextPid < 0 || buildGetCount(nextPid) > 0) { + return nextPid; + } + nextPid = buildNextInMemory(nextPid + 1); + } while (nextPid >= 0); + return -1; + } + + public int buildNextInMemory(int pid) { + int nextPid = buildSpilledStatus.nextClearBit(pid); + if (nextPid >= numOfPartitions) { + return -1; + } + do { + if (!buildHasBeenJoined(nextPid)) { + return nextPid; + } + nextPid = buildSpilledStatus.nextClearBit(nextPid + 1); + } while (nextPid >= 0 && nextPid < numOfPartitions); + return -1; + } + + public void probeIncrementCount(int pid) { + probePSizeInTups[pid]++; + } + + public int probeGetCount(int pid) { + return probePSizeInTups[pid]; + } + + public void probeSpill(int pid) { + probeSpilledStatus.set(pid); + } + + public boolean probeHasSpilled(int pid) { + return probeSpilledStatus.get(pid); + } + + public int buildGetMaxPartitionSize() { + int max = buildPSizeInTups[0]; + for (int i = 1; i < buildPSizeInTups.length; i++) { + if (buildPSizeInTups[i] > max) { + max = buildPSizeInTups[i]; + } + } + return max; + } + + public int probeGetMaxPartitionSize() { + int max = probePSizeInTups[0]; + for (int i = 1; i < probePSizeInTups.length; i++) { + if (probePSizeInTups[i] > max) { + max = probePSizeInTups[i]; + } + } + return max; + } + + } + + public void closeAndDeleteRunFiles() throws HyracksDataException { + for (RunFileWriter rfw : buildRFWriters) { + if (rfw != null) { + FileUtils.deleteQuietly(rfw.getFileReference().getFile()); + } + } + for (RunFileWriter rfw : probeRFWriters) { + if (rfw != null) { + FileUtils.deleteQuietly(rfw.getFileReference().getFile()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java index e05c06e..c6e95e1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java @@ -37,15 +37,15 @@ public class IntervalPartitionUtil { private IntervalPartitionUtil() { } - public static int determineK(int countR, int maxDurationR, int countS, int maxDurationS, int avgTuplePerFrame) { + public static int determineK(long countR, long maxDurationR, long countS, long maxDurationS, int avgTuplePerFrame) { double deltaR = 1.0 / maxDurationR; double deltaS = 1.0 / maxDurationS; - int knMinusTwo = 0; - int knMinusOne = 0; - int kn = 1; + long knMinusTwo = 0; + long knMinusOne = 0; + long kn = 1; - int prn = determinePn(kn, countR, deltaR); + long prn = determinePn(kn, countR, deltaR); double tn = determineTn(kn, determinePn(kn, countS, deltaS)); while ((kn != knMinusOne) && (kn != knMinusTwo)) { @@ -55,21 +55,25 @@ public class IntervalPartitionUtil { prn = determinePn(kn, countR, deltaR); tn = determineTn(kn, determinePn(kn, countS, deltaS)); } - return kn; + if (kn > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) kn; + } } - public static int determineKn(int countR, int countS, int avgTuplePerFrame, int prn, double tn) { + public static long determineKn(long countR, long countS, int avgTuplePerFrame, long prn, double tn) { double factorS = (3.0 * countS) / (2 * (C_IO + 2 * C_CPU) * tn); double factorR = (C_IO / avgTuplePerFrame) + ((4.0 * countR * C_CPU) / prn); - return (int) Math.cbrt(factorS * factorR); + return (long) Math.cbrt(factorS * factorR); } - public static int determinePn(int kn, int count, double delta) { + public static long determinePn(long kn, long count, double delta) { long knDelta = (long) Math.ceil(kn * delta); - return Math.min((int) ((kn * knDelta) + kn - ((knDelta * knDelta) / 2.0) - (knDelta / 2.0)), count); + return Math.min((long) ((kn * knDelta) + kn - ((knDelta * knDelta) / 2.0) - (knDelta / 2.0)), count); } - public static double determineTn(int kn, int pn) { + public static double determineTn(long kn, long pn) { return pn / ((kn * kn + kn) / 2.0); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java new file mode 100644 index 0000000..de02572 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.expressions; + +public abstract class AbstractExpressionAnnotation implements IExpressionAnnotation { + + protected Object object; + + @Override + public Object getObject() { + return object; + } + + @Override + public void setObject(Object object) { + this.object = object; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java index 140dfb1..3aa34c8 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java @@ -18,23 +18,11 @@ */ package org.apache.hyracks.algebricks.core.algebra.expressions; -public class ExpressionAnnotationNoCopyImpl implements IExpressionAnnotation { - - private Object object; +public class ExpressionAnnotationNoCopyImpl extends AbstractExpressionAnnotation { @Override public IExpressionAnnotation copy() { return this; } - @Override - public Object getObject() { - return object; - } - - @Override - public void setObject(Object object) { - this.object = object; - } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java index 5ee6b07..91c0a8b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java @@ -18,23 +18,11 @@ */ package org.apache.hyracks.algebricks.core.algebra.expressions; -public class IndexedNLJoinExpressionAnnotation implements IExpressionAnnotation { +public class IndexedNLJoinExpressionAnnotation extends AbstractExpressionAnnotation { public static final String HINT_STRING = "indexnl"; public static final IndexedNLJoinExpressionAnnotation INSTANCE = new IndexedNLJoinExpressionAnnotation(); - private Object object; - - @Override - public Object getObject() { - return object; - } - - @Override - public void setObject(Object object) { - this.object = object; - } - @Override public IExpressionAnnotation copy() { IndexedNLJoinExpressionAnnotation clone = new IndexedNLJoinExpressionAnnotation();