new partition join algorithm
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0b900514 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0b900514 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0b900514 Branch: refs/heads/ecarm002/interval_join_merge Commit: 0b90051413c96ae52cc3136d41b5589275f11385 Parents: b34a426 Author: Preston Carman <prest...@apache.org> Authored: Fri Sep 30 13:56:13 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Fri Sep 30 13:56:13 2016 -0700 ---------------------------------------------------------------------- .../IntervalPartitionJoinPOperator.java | 2 +- .../InMemoryIntervalPartitionJoin.java | 98 --- ...IntervalPartitionJoinOperatorDescriptor.java | 355 +++++---- .../IntervalPartitionJoinTaskState.java | 33 + .../IntervalPartitionJoiner.java | 772 ++++--------------- .../IntervalPartitionUtil.java | 70 -- ...IntervalPartitionJoinOperatorDescriptor.java | 319 -------- .../IntervalPartitionJoinTaskState.java | 33 - .../IntervalPartitionJoiner.java | 288 ------- 9 files changed, 412 insertions(+), 1558 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java index af77a92..73d159e 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.logging.Logger; import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory; -import org.apache.asterix.runtime.operators.joins.intervalpartition2.IntervalPartitionJoinOperatorDescriptor; +import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionJoinOperatorDescriptor; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java deleted file mode 100644 index aeea209..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.operators.joins.intervalpartition; - -import java.util.logging.Logger; - -import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; -import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo; -import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; - -public class InMemoryIntervalPartitionJoin { - - private static final Logger LOGGER = Logger.getLogger(InMemoryIntervalPartitionJoin.class.getName()); - - private final FrameTupleAccessor accessorBuild; - private final FrameTupleAppender appender; - private final IFrameBufferManager fbm; - private BufferInfo bufferInfo; - private final IIntervalMergeJoinChecker imjc; - - private long joinComparisonCount = 0; - private long joinResultCount = 0; - - public InMemoryIntervalPartitionJoin(IHyracksTaskContext ctx, IFrameBufferManager fbm, - IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, RecordDescriptor probeRd) - throws HyracksDataException { - bufferInfo = new BufferInfo(null, -1, -1); - this.accessorBuild = new FrameTupleAccessor(buildRd); - appender = new FrameTupleAppender(new VSizeFrame(ctx)); - this.imjc = imjc; - this.fbm = fbm; - LOGGER.fine( - "InMemoryIntervalPartitionJoin has been created for Thread ID " + Thread.currentThread().getId() + "."); - } - - public long getComparisonCount() { - return joinComparisonCount; - } - - public long getResultCount() { - return joinResultCount; - } - - public void join(IFrameTupleAccessor accessorProbe, int probeTupleIndex, IFrameWriter writer) - throws HyracksDataException { - if (fbm.getNumFrames() != 0) { - fbm.resetIterator(); - int frameIndex = fbm.next(); - while (fbm.exists()) { - fbm.getFrame(frameIndex, bufferInfo); - accessorBuild.reset(bufferInfo.getBuffer()); - for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) { - if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, - false)) { - appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer); - } - joinComparisonCount++; - } - frameIndex = fbm.next(); - } - } - } - - public void closeJoin(IFrameWriter writer) throws HyracksDataException { - appender.write(writer, true); - } - - private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe, - int probeSidetIx, IFrameWriter writer) throws HyracksDataException { - FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe, probeSidetIx); - joinResultCount++; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java index b4965ef..ddbe913 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java @@ -20,7 +20,6 @@ package org.apache.asterix.runtime.operators.joins.intervalpartition; import java.nio.ByteBuffer; -import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; @@ -36,34 +35,29 @@ import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractStateObject; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.RangeId; +import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage; +import org.apache.hyracks.dataflow.std.join.MergeJoinLocks; import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState; public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor { private static final long serialVersionUID = 1L; - private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0; - private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1; - - private static final String PROBE_REL = "RelR"; - private static final String BUILD_REL = "RelS"; - - private final int memsize; - private final int[] probeKeys; - private final int[] buildKeys; - + private static final int LEFT_ACTIVITY_ID = 0; + private static final int RIGHT_ACTIVITY_ID = 1; + private final int[] leftKeys; + private final int[] rightKeys; + private final int memoryForJoin; + private final IIntervalMergeJoinCheckerFactory imjcf; + private final RangeId rangeId; private final int k; private final int probeKey; private final int buildKey; - private final IIntervalMergeJoinCheckerFactory imjcf; - private final RangeId rangeId; private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName()); @@ -71,180 +65,253 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf, RangeId rangeId) { super(spec, 2, 1); - this.memsize = memoryForJoin; + recordDescriptors[0] = recordDescriptor; this.buildKey = leftKeys[0]; this.probeKey = rightKeys[0]; this.k = k; - this.buildKeys = leftKeys; - this.probeKeys = rightKeys; - recordDescriptors[0] = recordDescriptor; + this.leftKeys = leftKeys; + this.rightKeys = rightKeys; + this.memoryForJoin = memoryForJoin; this.imjcf = imjcf; this.rangeId = rangeId; } @Override public void contributeActivities(IActivityGraphBuilder builder) { - ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID); - ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID); - IActivity phase1 = new PartitionAndBuildActivityNode(p1Aid, p2Aid); - IActivity phase2 = new ProbeAndJoinActivityNode(p2Aid, p1Aid); + MergeJoinLocks locks = new MergeJoinLocks(); - builder.addActivity(this, phase1); - builder.addSourceEdge(0, phase1, 0); + ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID); + ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID); - builder.addActivity(this, phase2); - builder.addSourceEdge(1, phase2, 0); + IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, locks); + IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, locks); - builder.addBlockingEdge(phase1, phase2); + builder.addActivity(this, rightAN); + builder.addSourceEdge(1, rightAN, 0); - builder.addTargetEdge(0, phase2, 0); + builder.addActivity(this, leftAN); + builder.addSourceEdge(0, leftAN, 0); + builder.addTargetEdge(0, leftAN, 0); } - public static class BuildAndPartitionTaskState extends AbstractStateObject { - private IntervalPartitionJoiner ipj; - private int intervalPartitions; - private int partition; - private int k; - private int memoryForJoin; - - private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) { - super(jobId, taskId); - } - } - - private class PartitionAndBuildActivityNode extends AbstractActivityNode { + private class LeftJoinerActivityNode extends AbstractActivityNode { private static final long serialVersionUID = 1L; - private final ActivityId probeAid; + private final MergeJoinLocks locks; - public PartitionAndBuildActivityNode(ActivityId id, ActivityId probeAid) { + public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { super(id); - this.probeAid = probeAid; + this.locks = locks; } @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) { - - final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); - final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0); - - return new AbstractUnaryInputSinkOperatorNodePushable() { - private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState( - ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition)); - private boolean failure = false; - - @Override - public void open() throws HyracksDataException { - if (memsize <= 2) { - // Dedicated buffers: One buffer to read and one buffer for output - failure = true; - throw new HyracksDataException("not enough memory for join"); - } - state.k = k; + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + locks.setPartitions(nPartitions); + final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + return new LeftJoinerOperator(ctx, partition, inRecordDesc); + } - RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx); - long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), - partition); - long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition); - ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k, - partitionStart, partitionEnd).createPartitioner(); - ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k, - partitionStart, partitionEnd).createPartitioner(); + private class LeftJoinerOperator extends AbstractUnaryInputUnaryOutputOperatorNodePushable { - state.partition = partition; - state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k); - state.memoryForJoin = memsize; - IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, ctx); - state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions, - BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc); - - state.ipj.buildInit(); - LOGGER.setLevel(Level.FINE); - System.out - .println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel()); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("IntervalPartitionJoin is starting the build phase with " + state.k - + " granules repesenting " + state.intervalPartitions + " interval partitions using " - + state.memoryForJoin + " frames for memory."); - } - } + private final IHyracksTaskContext ctx; + private final int partition; + private final RecordDescriptor leftRd; + private IntervalPartitionJoinTaskState state; + private boolean first = true; - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - state.ipj.buildStep(buffer); - } + public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) { + this.ctx = ctx; + this.partition = partition; + this.leftRd = inRecordDesc; + } - @Override - public void close() throws HyracksDataException { - if (!failure) { - state.ipj.buildClose(); - ctx.setStateObject(state); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("IntervalPartitionJoin closed its build phase"); + @Override + public void open() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + writer.open(); + state = new IntervalPartitionJoinTaskState(ctx.getJobletContext().getJobId(), + new TaskId(getActivityId(), partition));; + state.leftRd = leftRd; + ctx.setStateObject(state); + locks.getRight(partition).signal(); + + do { + // Continue after joiner created in right branch. + if (state.partitionJoiner == null) { + locks.getLeft(partition).await(); } - } + } while (state.partitionJoiner == null); + state.status.branch[LEFT_ACTIVITY_ID].setStageOpen(); + locks.getRight(partition).signal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + locks.getLock(partition).unlock(); } - - @Override - public void fail() throws HyracksDataException { - failure = true; + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + locks.getLock(partition).lock(); + if (first) { + state.status.branch[LEFT_ACTIVITY_ID].setStageData(); + first = false; } - - }; - + try { + state.partitionJoiner.setFrame(LEFT_ACTIVITY_ID, buffer); + state.partitionJoiner.processLeftFrame(writer); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void fail() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.failed = true; + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void close() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.status.branch[LEFT_ACTIVITY_ID].noMore(); + if (state.failed) { + writer.fail(); + } else { + state.partitionJoiner.processLeftClose(writer); + writer.close(); + } + state.status.branch[LEFT_ACTIVITY_ID].setStageClose(); + locks.getRight(partition).signal(); + } finally { + locks.getLock(partition).unlock(); + } + } } } - private class ProbeAndJoinActivityNode extends AbstractActivityNode { + private class RightDataActivityNode extends AbstractActivityNode { private static final long serialVersionUID = 1L; - public ProbeAndJoinActivityNode(ActivityId id, ActivityId buildAid) { + private final ActivityId joinAid; + private final MergeJoinLocks locks; + + public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { super(id); + this.joinAid = joinAid; + this.locks = locks; } @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + locks.setPartitions(nPartitions); + RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + return new RightDataOperator(ctx, partition, inRecordDesc); + } - return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { - private BuildAndPartitionTaskState state; - - @Override - public void open() throws HyracksDataException { - state = (BuildAndPartitionTaskState) ctx.getStateObject( - new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition)); - - writer.open(); - state.ipj.probeInit(); - - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("IntervalPartitionJoin is starting the probe phase."); - } - } + private class RightDataOperator extends AbstractUnaryInputSinkOperatorNodePushable { + + private int partition; + private IHyracksTaskContext ctx; + private final RecordDescriptor rightRd; + private IntervalPartitionJoinTaskState state; + private boolean first = true; + + public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) { + this.ctx = ctx; + this.partition = partition; + this.rightRd = inRecordDesc; + } + + @Override + public void open() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + do { + // Wait for the state to be set in the context form Left. + state = (IntervalPartitionJoinTaskState) ctx.getStateObject(new TaskId(joinAid, partition)); + if (state == null) { + locks.getRight(partition).await(); + } + } while (state == null); + state.k = k; - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - state.ipj.probeStep(buffer, writer); + RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx); + long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), + partition); + long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition); + ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k, + partitionStart, partitionEnd).createPartitioner(); + ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k, + partitionStart, partitionEnd).createPartitioner(); + IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx); + + state.rightRd = rightRd; + state.partitionJoiner = new IntervalPartitionJoiner(ctx, memoryForJoin, partition, state.k, + state.status, locks, imjc, state.leftRd, state.rightRd, buildHpc, probeHpc); + state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen(); + locks.getLeft(partition).signal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + locks.getLock(partition).unlock(); } - - @Override - public void fail() throws HyracksDataException { - writer.fail(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + locks.getLock(partition).lock(); + if (first) { + state.status.branch[RIGHT_ACTIVITY_ID].setStageData(); + first = false; } - - @Override - public void close() throws HyracksDataException { - state.ipj.probeClose(writer); - state.ipj.joinSpilledPartitions(writer); - state.ipj.closeAndDeleteRunFiles(); - writer.close(); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("IntervalPartitionJoin closed its probe phase"); + try { + while (!state.status.continueRightLoad + && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) { + // Wait for the state to request right frame unless left has finished. + locks.getRight(partition).await(); } + state.partitionJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer); + state.status.continueRightLoad = false; + locks.getLeft(partition).signal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void fail() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.failed = true; + locks.getLeft(partition).signal(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void close() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.status.branch[RIGHT_ACTIVITY_ID].setStageClose(); + locks.getLeft(partition).signal(); + } finally { + locks.getLock(partition).unlock(); } - }; + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java new file mode 100644 index 0000000..1939899 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators.joins.intervalpartition; + +import org.apache.hyracks.api.dataflow.TaskId; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.dataflow.std.join.MergeJoinTaskState; + +public class IntervalPartitionJoinTaskState extends MergeJoinTaskState { + protected IntervalPartitionJoiner partitionJoiner; + public int k; + + public IntervalPartitionJoinTaskState(JobId jobId, TaskId taskId) { + super(jobId, taskId); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java index 9c5a872..984db20 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java @@ -18,21 +18,17 @@ */ package org.apache.asterix.runtime.operators.joins.intervalpartition; -import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map.Entry; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeMap; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; -import org.apache.commons.io.FileUtils; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.comm.IFrameTupleAppender; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -41,685 +37,251 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.common.io.RunFileReader; import org.apache.hyracks.dataflow.common.io.RunFileWriter; -import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedMemoryConstrain; +import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo; +import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager; +import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner; +import org.apache.hyracks.dataflow.std.join.MergeJoinLocks; +import org.apache.hyracks.dataflow.std.join.MergeStatus; +import org.apache.hyracks.dataflow.std.structures.RunFilePointer; import org.apache.hyracks.dataflow.std.structures.TuplePointer; -/** - * The Interval Partition Join runs in three stages: build, probe-in-memory, probe-spill. - * build: Saves all build partitions either to memory or disk. - * probe-in-memory: Joins all in memory partitions and saves the necessary partitions to disk. - * probe-spill: Spilled build partitions are loaded into memory and joined with all probe remaining partitions. - */ -public class IntervalPartitionJoiner { +public class IntervalPartitionJoiner extends AbstractMergeJoiner { private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName()); - private enum SIDE { - BUILD, - PROBE - } - - private IHyracksTaskContext ctx; - - private final String buildRelName; - private final String probeRelName; + private final RunFileWriter probeRunFileWriter; + private int probeRunFilePid = -1; private final ITuplePartitionComputer buildHpc; private final ITuplePartitionComputer probeHpc; - private final RecordDescriptor buildRd; - private final RecordDescriptor probeRd; - - private RunFileWriter[] buildRFWriters; //writing spilled build partitions - private RunFileWriter[] probeRFWriters; //writing spilled probe partitions - private final int buildMemory; private final int k; private final int numOfPartitions; - private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions - - private VPartitionTupleBufferManager buildBufferManager; + private long buildSize = 0; + private long probeSize = 0; + private final TreeMap<RunFilePointer, Integer> probeRunFilePointers; + private final VPartitionTupleBufferManager buildBufferManager; + private final TuplePointer tempPtr = new TuplePointer(); + private final List<Integer> buildInMemoryPartitions; private final FrameTupleAccessor accessorBuild; - private final FrameTupleAccessor accessorProbe; - - // stats information - private IntervalPartitionJoinData ipjd; - - private IFrame reloadBuffer; - private TuplePointer tempPtr = new TuplePointer(); - - private IIntervalMergeJoinChecker imjc; + private BufferInfo bufferInfo; + private long spillWriteCount = 0; + private long spillReadCount = 0; private long joinComparisonCount = 0; private long joinResultCount = 0; - private long spillReadCount = 0; - private long spillWriteCount = 0; - private long buildSize; - private long probeSize; - private int tmp = -1; - - private RunFileWriter probeRunFileWriter = null; - private final IFrameTupleAppender probeRunFileAppender; - private int probeRunFilePid = -1; - - public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions, - String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, - RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) - throws HyracksDataException { - this.ctx = ctx; - // TODO fix available memory size - this.buildMemory = memForJoin; - this.k = k; - this.buildRd = buildRd; - this.probeRd = probeRd; - this.buildHpc = buildHpc; - this.probeHpc = probeHpc; - this.imjc = imjc; - this.buildRelName = buildRelName; - this.probeRelName = probeRelName; + private final IIntervalMergeJoinChecker imjc; + private final FrameTupleAccessor accessorProbe; + private final IFrame reloadBuffer; + private boolean moreBuildProcessing = true; + private final List<IFrameBufferManager> fbms = new ArrayList<>(); - this.numOfPartitions = numOfPartitions; - this.buildRFWriters = new RunFileWriter[numOfPartitions]; - this.probeRFWriters = new RunFileWriter[numOfPartitions]; - this.inMemJoiner = new InMemoryIntervalPartitionJoin[numOfPartitions]; + public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memorySize, int partition, int k, MergeStatus status, + MergeJoinLocks locks, IIntervalMergeJoinChecker imjc, RecordDescriptor leftRd, RecordDescriptor rightRd, + ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) throws HyracksDataException { + super(ctx, partition, status, locks, leftRd, rightRd); - this.accessorBuild = new FrameTupleAccessor(buildRd); - this.accessorProbe = new FrameTupleAccessor(probeRd); + bufferInfo = new BufferInfo(null, -1, -1); + this.accessorProbe = new FrameTupleAccessor(leftRd); reloadBuffer = new VSizeFrame(ctx); - probeRunFileAppender = new FrameTupleAppender(new VSizeFrame(ctx)); - ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions); - } - public void buildInit() throws HyracksDataException { + this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);; + this.imjc = imjc; + + // TODO fix available memory size + this.buildMemory = memorySize; buildBufferManager = new VPartitionTupleBufferManager(ctx, VPartitionTupleBufferManager.NO_CONSTRAIN, numOfPartitions, buildMemory * ctx.getInitialFrameSize()); - System.err.println("k: " + k); - buildSize = 0; - } - public void buildStep(ByteBuffer buffer) throws HyracksDataException { - accessorBuild.reset(buffer); - int tupleCount = accessorBuild.getTupleCount(); + this.k = k; + this.buildHpc = buildHpc; + this.probeHpc = probeHpc; - int pid; - for (int i = 0; i < tupleCount; ++i) { - pid = buildHpc.partition(accessorBuild, i, k); + FileReference file = ctx.getJobletContext().createManagedWorkspaceFile("IntervalPartitionJoiner"); + probeRunFileWriter = new RunFileWriter(file, ctx.getIOManager()); + probeRunFileWriter.open(); - if (tmp != pid) { - System.err.println("buildSize: " + buildSize + " pid: " + pid + " k: " + k + " pair: " - + IntervalPartitionUtil.getIntervalPartition(pid, k)); - tmp = pid; - } - processTuple(i, pid); - ipjd.buildIncrementCount(pid); - buildSize++; - } - } + probeRunFilePointers = new TreeMap<>(RunFilePointer.ASC); + buildInMemoryPartitions = new LinkedList<>(); - public void buildClose() throws HyracksDataException { - System.err.println("buildSize: " + buildSize); - - int inMemoryPartitions = 0; - int totalBuildPartitions = 0; - flushAndClearBuildSpilledPartition(); - - // Trying to bring back as many spilled partitions as possible, making them resident - bringBackSpilledPartitionIfHasMoreMemory(false); - - // Update build partition join map based on partitions with actual data. - for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) { - if (ipjd.buildGetCount(i) == 0) { - ipjd.buildRemoveFromJoin(i); - } else if (ipjd.buildGetCount(i) > 0) { - // Set up build memory for processing joins for partitions in memory. - createInMemoryJoiner(i); - inMemoryPartitions++; - totalBuildPartitions += ipjd.buildGetCount(i); - } - } + this.accessorBuild = new FrameTupleAccessor(rightRd); + LOGGER.setLevel(Level.FINE); + System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel()); if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("IntervalPartitionJoin has closed the build phase. Total tuples: " + totalBuildPartitions - + ", In memory partitions: " + inMemoryPartitions + ", Spilled partitions: " - + ipjd.buildGetSpilledCount()); + LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize + + " frames of memory."); } } - private void processTuple(int tid, int pid) throws HyracksDataException { - while (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { - int victimPartition = selectPartitionToSpill(); - if (victimPartition < 0) { - throw new HyracksDataException( - "No more space left in the memory buffer, please give join more memory budgets."); - } - spillPartition(victimPartition); - } - } - - private int selectPartitionToSpill() { - int partitionToSpill = selectLargestSpilledPartition(); - int maxToSpillPartSize = 0; - if (partitionToSpill < 0 || (maxToSpillPartSize = buildBufferManager.getPhysicalSize(partitionToSpill)) == ctx - .getInitialFrameSize()) { - int partitionInMem = selectNextInMemoryPartitionToSpill(); - if (partitionInMem >= 0 && buildBufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) { - partitionToSpill = partitionInMem; + @Override + public void processLeftFrame(IFrameWriter writer) throws HyracksDataException { + while (inputAccessor[LEFT_PARTITION].exists()) { + int pid = probeHpc.partition(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), k); + + if (probeRunFilePid != pid) { + // Log new partition locations. + RunFilePointer rfp = new RunFilePointer(probeRunFileWriter.getFileSize(), + inputAccessor[LEFT_PARTITION].getTupleId()); + probeRunFilePointers.put(rfp, pid); + probeRunFilePid = pid; } + inputAccessor[LEFT_PARTITION].next(); + probeSize++; } - return partitionToSpill; + inputBuffer[LEFT_PARTITION].rewind(); + probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]); + spillWriteCount++; } - /** - * Select next partition to spill. The partitions have been numbered in the order they should be spilled. - * - * @return - */ - private int selectNextInMemoryPartitionToSpill() { - for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) { - if (!ipjd.buildIsSpilled(i) && buildBufferManager.getPhysicalSize(i) > 0) { - return i; - } - } - return -1; - } + @Override + public void processLeftClose(IFrameWriter writer) throws HyracksDataException { + joinLoopOnMemory(writer); - private int selectLargestSpilledPartition() { - int pid = -1; - int max = 0; - for (int i = ipjd.buildNextSpilled(0); i >= 0; i = ipjd.buildNextSpilled(i + 1)) { - int partSize = buildBufferManager.getPhysicalSize(i); - if (partSize > max) { - max = partSize; - pid = i; - } + // Flush result. + resultAppender.write(writer, true); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, " + + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read."); } - return pid; } - private void spillPartition(int pid) throws HyracksDataException { - RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD); - spillWriteCount += buildBufferManager.getNumFrames(pid); - buildBufferManager.flushPartition(pid, writer); - buildBufferManager.clearPartition(pid); - ipjd.buildSpill(pid); - } + private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException { + RunFileReader pReader = probeRunFileWriter.createDeleteOnCloseReader(); + pReader.open(); + // Load first frame. + loadReaderNextFrame(pReader); - private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException { - RunFileWriter[] runFileWriters = null; - String refName = null; - switch (whichSide) { - case BUILD: - runFileWriters = buildRFWriters; - refName = buildRelName; - break; - case PROBE: - refName = probeRelName; - runFileWriters = probeRFWriters; - break; - default: - } - RunFileWriter writer = runFileWriters[pid]; - if (writer == null) { - FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName); - writer = new RunFileWriter(file, ctx.getIOManager()); - writer.open(); - runFileWriters[pid] = writer; - } - return writer; - } + while (moreBuildProcessing) { + fillMemory(); + joinMemoryBlockWithRunFile(writer, pReader); - public void clearBuildMemory() throws HyracksDataException { - for (int pid = 0; pid < numOfPartitions; ++pid) { - if (buildBufferManager.getNumTuples(pid) > 0) { + // Clean up + for (int pid : buildInMemoryPartitions) { buildBufferManager.clearPartition(pid); } + buildInMemoryPartitions.clear(); } - ipjd.buildClearMemory(); - } - - private void flushAndClearBuildSpilledPartition() throws HyracksDataException { - for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) { - if (buildBufferManager.getNumTuples(pid) > 0) { - spillWriteCount += buildBufferManager.getNumFrames(pid); - RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD); - buildBufferManager.flushPartition(pid, runFileWriter); - buildBufferManager.clearPartition(pid); - buildRFWriters[pid].close(); - } - } - } - - private void flushProbeSpilledPartition() throws HyracksDataException { - if (probeRunFileWriter != null) { - // flush previous runFile - probeRunFileAppender.write(probeRunFileWriter, true); - probeRunFileWriter.close(); - spillWriteCount++; - } - } - - private void bringBackSpilledPartitionIfHasMoreMemory(boolean partitalLoad) throws HyracksDataException { - int freeFrames = buildMemory; - for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) { - freeFrames -= buildBufferManager.getNumFrames(i); - } - - int pid = 0; - while ((pid = selectPartitionsToReload(freeFrames, pid, partitalLoad)) >= 0 && freeFrames > 0) { - if (pid == 225) { - int i = 0; - } - if (!loadPartitionInMem(pid, buildRFWriters[pid])) { - return; - } - freeFrames -= buildBufferManager.getNumFrames(pid); - } + pReader.close(); } - int buildParitialLoadPid = -1; - int buildParitialNextTid = -1; - long buildParitialResetReader = -1; - - private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException { - if (pid == 225) { - int i = 0; - } - RunFileReader r = wr.createReader(); - r.open(); - if (buildParitialLoadPid == pid && buildParitialResetReader > 0) { - r.reset(buildParitialResetReader); - } - int framesLoaded = 0; - while (r.nextFrame(reloadBuffer)) { - framesLoaded++; - accessorBuild.reset(reloadBuffer.getBuffer()); - spillReadCount++; - for (int tid = buildParitialNextTid > 0 ? buildParitialNextTid : 0; tid < accessorBuild - .getTupleCount(); tid++) { - if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { - // for some reason (e.g. due to fragmentation) if the inserting failed - // we need to start this partition from this location on the next round. - buildParitialLoadPid = pid; - buildParitialNextTid = tid; - buildParitialResetReader = r.getReadPointer(); - ipjd.buildLoad(pid); - createInMemoryJoiner(pid); - r.close(); - return false; + private void joinMemoryBlockWithRunFile(IFrameWriter writer, RunFileReader pReader) throws HyracksDataException { + // Join Disk partitions with Memory partitions + for (RunFilePointer probeId : probeRunFilePointers.navigableKeySet()) { + Pair<Integer, Integer> probe = IntervalPartitionUtil.getIntervalPartition(probeRunFilePointers.get(probeId), + k); + for (int buildId : buildInMemoryPartitions) { + Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k); + if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) { + fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId)); } } - } - if (framesLoaded == 0) { - int t = 0; - } - - ipjd.buildLoad(pid); - createInMemoryJoiner(pid); - r.close(); - buildRFWriters[pid] = null; - buildParitialLoadPid = -1; - buildParitialNextTid = -1; - buildParitialResetReader = -1; - return true; - } - - private int selectPartitionsToReload(int freeFrames, int pid, boolean partitalLoad) { - int freeSpace = freeFrames * ctx.getInitialFrameSize(); - if (freeSpace > 0 && buildParitialLoadPid > 0 && buildParitialResetReader > 0) { - return buildParitialLoadPid; - } - for (int id = ipjd.buildNextSpilled(pid); id >= 0; id = ipjd.buildNextSpilled(id + 1)) { - assert buildRFWriters[id].getFileSize() > 0 : "How come a spilled partition have size 0?"; - if (partitalLoad || freeSpace >= buildRFWriters[id].getFileSize()) { - return id; + if (!fbms.isEmpty()) { + join(pReader, probeId, fbms, writer); } + fbms.clear(); } - return -1; } - private void createInMemoryJoiner(int pid) throws HyracksDataException { - inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx, - buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd); - } - - private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException { - joinComparisonCount += inMemJoiner[pid].getComparisonCount(); - joinResultCount += inMemJoiner[pid].getResultCount(); - inMemJoiner[pid].closeJoin(writer); - inMemJoiner[pid] = null; - } - - public void probeInit() throws HyracksDataException { - probeRFWriters = new RunFileWriter[numOfPartitions]; - probeSize = 0; - } - - public void probeStep(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException { - accessorProbe.reset(buffer); - int tupleCount = accessorProbe.getTupleCount(); + private void join(RunFileReader pReader, RunFilePointer rfpStart, List<IFrameBufferManager> buildFbms, + IFrameWriter writer) throws HyracksDataException { + long fileOffsetStart = rfpStart.getFileOffset(); + int tupleStart = rfpStart.getTupleIndex(); - for (int i = 0; i < tupleCount; ++i) { - int pid = probeHpc.partition(accessorProbe, i, k); + RunFilePointer rfpEnd = probeRunFilePointers.higherKey(rfpStart); + long fileOffsetEnd = rfpEnd == null ? pReader.getFileSize() : rfpEnd.getFileOffset(); + int tupleEnd = rfpEnd == null ? Integer.MAX_VALUE : rfpEnd.getTupleIndex(); - if (tmp != pid) { - System.err.println("probeSize: " + probeSize + " pid: " + pid + " k: " + k + " pair: " - + IntervalPartitionUtil.getIntervalPartition(pid, k)); - tmp = pid; - } - - if (!ipjd.hasProbeJoinMap(pid)) { - // Set probe join map - ipjd.setProbeJoinMap(pid, - IntervalPartitionUtil.getProbeJoinPartitions(pid, ipjd.buildPSizeInTups, imjc, k)); - } - - // Tuple has potential match from build phase - if (!ipjd.isProbeJoinMapEmpty(pid)) { - if (ipjd.probeHasSpilled(pid)) { - // pid is Spilled - probeSpillTuple(accessorProbe, i, pid); - } - for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) { - // pid has join partitions that are Resident - int j = pidIterator.next(); - if (inMemJoiner[j] != null) { - inMemJoiner[j].join(accessorProbe, i, writer); - } - } - } - ipjd.probeIncrementCount(pid); - probeSize++; - } - } - - /** - * Closes the probe process. - * We do NOT join the spilled partitions here, use {@link joinSpilledPartitions}. - * - * @param writer - * @throws HyracksDataException - */ - public void probeClose(IFrameWriter writer) throws HyracksDataException { - System.err.println("probeSize: " + probeSize); - - for (int i = 0; i < inMemJoiner.length; ++i) { - if (inMemJoiner[i] != null) { - closeInMemoryJoiner(i, writer); - ipjd.buildLogJoined(i); - ipjd.buildRemoveFromJoin(i); - } - } - clearBuildMemory(); - flushProbeSpilledPartition(); - } - - private void probeSpillTuple(IFrameTupleAccessor accessorProbe, int probeTupleIndex, int pid) - throws HyracksDataException { - if (pid != probeRunFilePid) { - flushProbeSpilledPartition(); - probeRunFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE); - probeRunFilePid = pid; - } - if (!probeRunFileAppender.append(accessorProbe, probeTupleIndex)) { - probeRunFileAppender.write(probeRunFileWriter, true); - probeRunFileAppender.append(accessorProbe, probeTupleIndex); - spillWriteCount++; + if (pReader.getReadPointer() != fileOffsetStart) { + pReader.reset(fileOffsetStart); + loadReaderNextFrame(pReader); } - } + do { + int start = pReader.getReadPointer() == fileOffsetStart ? tupleStart : 0; + int end = pReader.getReadPointer() == fileOffsetEnd ? tupleEnd : accessorProbe.getTupleCount(); - public RunFileReader getBuildRFReader(int pid) throws HyracksDataException { - return (buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader(); - } - - public RunFileReader getProbeRFReader(int pid) throws HyracksDataException { - return (probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader(); - } - - public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException { - LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap; - while (ipjd.buildGetSpilledCount() > 0) { - // Load back spilled build partitions. - // TODO only load partition required for spill join. Consider both sides. - bringBackSpilledPartitionIfHasMoreMemory(true); - - // Create in memory joiners. - // for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd - // .buildNextInMemoryWithResults(pid + 1)) { - // createInMemoryJoiner(pid); - // } - - probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap(); - - // Join all build partitions with disk probe partitions. - for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) { - if (entry.getKey() == 221) { - int t = 0; - } - System.err.println(" join pid: " + entry.getKey() + " with : " + probeInMemoryJoinMap); - - if (ipjd.probeGetCount(entry.getKey()) > 0 && !probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) { - joinSpilledProbeWithBuildMemory(writer, probeInMemoryJoinMap, entry.getKey()); - } - } - - // Clean up build memory. - for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd - .buildNextInMemoryWithResults(pid + 1)) { - closeInMemoryJoiner(pid, writer); - if (pid != buildParitialLoadPid) { - ipjd.buildLogJoined(pid); - ipjd.buildRemoveFromJoin(pid); - } else { - int t = 0; + for (int i = start; i < end; ++i) { + // Tuple has potential match from build phase + for (IFrameBufferManager fbm : buildFbms) { + joinTupleWithMemoryPartition(accessorProbe, i, fbm, writer); } } - clearBuildMemory(); - } + } while (pReader.getReadPointer() < fileOffsetEnd && loadReaderNextFrame(pReader)); } - private void joinSpilledProbeWithBuildMemory(IFrameWriter writer, - LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap, int probePid) - throws HyracksDataException { - RunFileReader pReader = getProbeRFReader(probePid); - pReader.open(); - while (pReader.nextFrame(reloadBuffer)) { + private boolean loadReaderNextFrame(RunFileReader pReader) throws HyracksDataException { + if (pReader.nextFrame(reloadBuffer)) { accessorProbe.reset(reloadBuffer.getBuffer()); spillReadCount++; - for (int i = 0; i < accessorProbe.getTupleCount(); ++i) { - // Tuple has potential match from build phase - for (Integer j : probeInMemoryJoinMap.get(probePid)) { - // j has join partitions that are Resident - if (inMemJoiner[j] != null) { - inMemJoiner[j].join(accessorProbe, i, writer); - } - } - } + return true; } - pReader.close(); + return false; } - class IntervalPartitionJoinData { - private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap; - - private int[] buildPSizeInTups; - private int[] probePSizeInTups; - - private BitSet buildJoinedCompleted; //0=waiting, 1=joined - private BitSet buildSpilledStatus; //0=resident, 1=spilled - private BitSet buildInMemoryStatus; //0=unknown, 1=resident - private BitSet probeSpilledStatus; //0=resident, 1=spilled - - public IntervalPartitionJoinData(int k, IIntervalMergeJoinChecker imjc, int numberOfPartitions) { - probeJoinMap = new LinkedHashMap<>(); - - buildPSizeInTups = new int[numberOfPartitions]; - probePSizeInTups = new int[numberOfPartitions]; - - buildJoinedCompleted = new BitSet(numberOfPartitions); - buildInMemoryStatus = new BitSet(numberOfPartitions); - buildSpilledStatus = new BitSet(numberOfPartitions); - probeSpilledStatus = new BitSet(numberOfPartitions); - } - - public LinkedHashMap<Integer, LinkedHashSet<Integer>> probeGetInMemoryJoinMap() { - return IntervalPartitionUtil.getInMemorySpillJoinMap(probeJoinMap, buildInMemoryStatus, probeSpilledStatus); - } - - public boolean hasProbeJoinMap(int pid) { - return probeJoinMap.containsKey(pid); - } - - public boolean isProbeJoinMapEmpty(int pid) { - return probeJoinMap.get(pid).isEmpty(); - } - - public Iterator<Integer> getProbeJoinMap(int pid) { - return probeJoinMap.get(pid).iterator(); - } - - public void setProbeJoinMap(int pid, LinkedHashSet<Integer> map) { - probeJoinMap.put(new Integer(pid), map); - for (Integer i : map) { - if (buildIsSpilled(i)) { - // Build join partition has spilled. Now spill the probe also. - probeSpilledStatus.set(pid); + public void joinTupleWithMemoryPartition(IFrameTupleAccessor accessorProbe, int probeTupleIndex, + IFrameBufferManager fbm, IFrameWriter writer) throws HyracksDataException { + if (fbm.getNumFrames() == 0) { + return; + } + fbm.resetIterator(); + int frameIndex = fbm.next(); + while (fbm.exists()) { + fbm.getFrame(frameIndex, bufferInfo); + accessorBuild.reset(bufferInfo.getBuffer()); + for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) { + if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) { + appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer); } + joinComparisonCount++; } + frameIndex = fbm.next(); } + } - public void buildClearMemory() { - buildInMemoryStatus.clear(); - } - - public void buildIncrementCount(int pid) { - buildInMemoryStatus.set(pid); - buildPSizeInTups[pid]++; - } - - public int buildGetCount(int pid) { - return buildPSizeInTups[pid]; - } - - public void buildLogJoined(int pid) { - buildSpilledStatus.clear(pid); - buildJoinedCompleted.set(pid); - } - - public void buildRemoveFromJoin(int pid) { - buildSpilledStatus.clear(pid); - buildJoinedCompleted.set(pid); - } - - public boolean buildHasBeenJoined(int pid) { - return buildJoinedCompleted.get(pid); - } - - public int buildGetSpilledCount() { - return buildSpilledStatus.cardinality(); - } - - public void buildSpill(int pid) { - buildInMemoryStatus.clear(pid); - buildSpilledStatus.set(pid); - } - - public void buildLoad(int pid) { - buildInMemoryStatus.set(pid); - buildSpilledStatus.clear(pid); - } - - public boolean buildIsSpilled(int pid) { - return buildSpilledStatus.get(pid); - } - - public int buildNextSpilled(int pid) { - return buildSpilledStatus.nextSetBit(pid); - } - - public int buildNextInMemoryWithResults(int pid) { - int nextPid = buildNextInMemory(pid); - do { - if (nextPid < 0 || buildGetCount(nextPid) > 0) { - return nextPid; - } - nextPid = buildNextInMemory(nextPid + 1); - } while (nextPid >= 0); - return -1; - } + private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe, + int probeSidetIx, IFrameWriter writer) throws HyracksDataException { + FrameUtils.appendConcatToWriter(writer, resultAppender, accessorBuild, buildSidetIx, accessorProbe, + probeSidetIx); + joinResultCount++; + } - public int buildNextInMemory(int pid) { - int nextPid = buildSpilledStatus.nextClearBit(pid); - if (nextPid >= numOfPartitions) { - return -1; + private void fillMemory() throws HyracksDataException { + int buildPid = -1; + TupleStatus ts; + for (ts = loadRightTuple(); ts.isLoaded(); ts = loadRightTuple()) { + int pid = buildHpc.partition(inputAccessor[RIGHT_PARTITION], inputAccessor[RIGHT_PARTITION].getTupleId(), + k); + if (!buildBufferManager.insertTuple(pid, inputAccessor[RIGHT_PARTITION], + inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) { + return; } - do { - if (!buildHasBeenJoined(nextPid)) { - return nextPid; - } - nextPid = buildSpilledStatus.nextClearBit(nextPid + 1); - } while (nextPid >= 0 && nextPid < numOfPartitions); - return -1; - } - - public void probeIncrementCount(int pid) { - probePSizeInTups[pid]++; - } - - public int probeGetCount(int pid) { - return probePSizeInTups[pid]; - } - - public void probeSpill(int pid) { - probeSpilledStatus.set(pid); - } - public boolean probeHasSpilled(int pid) { - return probeSpilledStatus.get(pid); - } - - public int buildGetMaxPartitionSize() { - int max = buildPSizeInTups[0]; - for (int i = 1; i < buildPSizeInTups.length; i++) { - if (buildPSizeInTups[i] > max) { - max = buildPSizeInTups[i]; - } + if (buildPid != pid) { + // Track new partitions in memory. + buildInMemoryPartitions.add(pid); + buildPid = pid; } - return max; + inputAccessor[RIGHT_PARTITION].next(); + buildSize++; } - - public int probeGetMaxPartitionSize() { - int max = probePSizeInTups[0]; - for (int i = 1; i < probePSizeInTups.length; i++) { - if (probePSizeInTups[i] > max) { - max = probePSizeInTups[i]; - } - } - return max; + if (ts.isEmpty()) { + moreBuildProcessing = false; } - } - public void closeAndDeleteRunFiles() throws HyracksDataException { - for (RunFileWriter rfw : buildRFWriters) { - if (rfw != null) { - FileUtils.deleteQuietly(rfw.getFileReference().getFile()); - } - } - for (RunFileWriter rfw : probeRFWriters) { - if (rfw != null) { - FileUtils.deleteQuietly(rfw.getFileReference().getFile()); - } - } - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("IntervalPartitionJoiner statitics: " + joinComparisonCount + " comparisons, " - + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read."); + private TupleStatus loadRightTuple() throws HyracksDataException { + TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION); + if (loaded == TupleStatus.UNKNOWN) { + loaded = pauseAndLoadRightTuple(); } + return loaded; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java index 453287d..671c082 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java @@ -18,14 +18,6 @@ */ package org.apache.asterix.runtime.operators.joins.intervalpartition; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map.Entry; - -import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.dataflow.value.IRangeMap; @@ -109,27 +101,6 @@ public class IntervalPartitionUtil { return (k * k + k) / 2; } - public static void printJoinPartitionMap(ArrayList<HashSet<Integer>> partitionMap) { - for (int i = 0; i < partitionMap.size(); ++i) { - System.out.print("(hashset) Partition " + i + " must join with partition(s): "); - for (Integer map : partitionMap.get(i)) { - System.out.print(map + " "); - } - System.out.println(""); - } - } - - public static void printPartitionMap(int k) { - for (int i = 0; i < k; ++i) { - for (int j = i; j < k; ++j) { - int pid = intervalPartitionMap(i, j, k); - Pair<Integer, Integer> partition = getIntervalPartition(pid, k); - System.out.println("Map partition (" + i + ", " + j + ") to partition id: " + pid + " back to pair (" - + partition.first + ", " + partition.second + ")"); - } - } - } - /** * Map the partition start and end points to a single value. * The mapped partitions are sorted in interval starting at 0. @@ -192,47 +163,6 @@ public class IntervalPartitionUtil { return partitionEnd; } - public static LinkedHashSet<Integer> getProbeJoinPartitions(int pid, int[] buildPSizeInTups, - IIntervalMergeJoinChecker imjc, int k) { - LinkedHashSet<Integer> joinMap = new LinkedHashSet<>(); - Pair<Integer, Integer> map = getIntervalPartition(pid, k); - int probeStart = map.first; - int probeEnd = map.second; - // Build partitions with data - for (int buildStart = 0; buildStart < k; ++buildStart) { - for (int buildEnd = k - 1; buildStart <= buildEnd; --buildEnd) { - int buildId = intervalPartitionMap(buildStart, buildEnd, k); - if (buildPSizeInTups[buildId] > 0) { - // Join partitions for probe's pid - if (!(buildStart == 0 && probeStart == 0) - && imjc.compareIntervalPartition(buildStart, buildEnd, probeStart, probeEnd)) { - joinMap.add(buildId); - } - } - } - } - return joinMap; - } - - public static LinkedHashMap<Integer, LinkedHashSet<Integer>> getInMemorySpillJoinMap( - LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap, BitSet buildInMemoryStatus, - BitSet probeSpilledStatus) { - LinkedHashMap<Integer, LinkedHashSet<Integer>> inMemoryMap = new LinkedHashMap<>(); - for (Entry<Integer, LinkedHashSet<Integer>> entry : probeJoinMap.entrySet()) { - if (probeSpilledStatus.get(entry.getKey())) { - for (Integer i : entry.getValue()) { - if (buildInMemoryStatus.get(i)) { - if (!inMemoryMap.containsKey(entry.getKey())) { - inMemoryMap.put(entry.getKey(), new LinkedHashSet<Integer>()); - } - inMemoryMap.get(entry.getKey()).add(i); - } - } - } - } - return inMemoryMap; - } - public static long getPartitionDuration(long partitionStart, long partitionEnd, int k) throws HyracksDataException { if (k <= 2) { throw new HyracksDataException("k is to small for interval partitioner.");