http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java deleted file mode 100644 index a985eee..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.runtime.operators.joins.intervalpartition2; - -import java.nio.ByteBuffer; -import java.util.logging.Logger; - -import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; -import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory; -import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionComputerFactory; -import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.ActivityId; -import org.apache.hyracks.api.dataflow.IActivity; -import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.TaskId; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; -import org.apache.hyracks.dataflow.std.base.RangeId; -import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage; -import org.apache.hyracks.dataflow.std.join.MergeJoinLocks; -import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState; - -public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor { - private static final long serialVersionUID = 1L; - - private static final int LEFT_ACTIVITY_ID = 0; - private static final int RIGHT_ACTIVITY_ID = 1; - private final int[] leftKeys; - private final int[] rightKeys; - private final int memoryForJoin; - private final IIntervalMergeJoinCheckerFactory imjcf; - private final RangeId rangeId; - private final int k; - - private final int probeKey; - private final int buildKey; - - private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName()); - - public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memoryForJoin, int k, - int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf, - RangeId rangeId) { - super(spec, 2, 1); - recordDescriptors[0] = recordDescriptor; - this.buildKey = leftKeys[0]; - this.probeKey = rightKeys[0]; - this.k = k; - this.leftKeys = leftKeys; - this.rightKeys = rightKeys; - this.memoryForJoin = memoryForJoin; - this.imjcf = imjcf; - this.rangeId = rangeId; - } - - @Override - public void contributeActivities(IActivityGraphBuilder builder) { - MergeJoinLocks locks = new MergeJoinLocks(); - - ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID); - ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID); - - IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, locks); - IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, locks); - - builder.addActivity(this, rightAN); - builder.addSourceEdge(1, rightAN, 0); - - builder.addActivity(this, leftAN); - builder.addSourceEdge(0, leftAN, 0); - builder.addTargetEdge(0, leftAN, 0); - } - - private class LeftJoinerActivityNode extends AbstractActivityNode { - private static final long serialVersionUID = 1L; - - private final MergeJoinLocks locks; - - public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { - super(id); - this.locks = locks; - } - - @Override - public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) - throws HyracksDataException { - locks.setPartitions(nPartitions); - final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); - return new LeftJoinerOperator(ctx, partition, inRecordDesc); - } - - private class LeftJoinerOperator extends AbstractUnaryInputUnaryOutputOperatorNodePushable { - - private final IHyracksTaskContext ctx; - private final int partition; - private final RecordDescriptor leftRd; - private IntervalPartitionJoinTaskState state; - private boolean first = true; - - public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) { - this.ctx = ctx; - this.partition = partition; - this.leftRd = inRecordDesc; - } - - @Override - public void open() throws HyracksDataException { - locks.getLock(partition).lock(); - try { - writer.open(); - state = new IntervalPartitionJoinTaskState(ctx.getJobletContext().getJobId(), - new TaskId(getActivityId(), partition));; - state.leftRd = leftRd; - ctx.setStateObject(state); - locks.getRight(partition).signal(); - - do { - // Continue after joiner created in right branch. - if (state.partitionJoiner == null) { - locks.getLeft(partition).await(); - } - } while (state.partitionJoiner == null); - state.status.branch[LEFT_ACTIVITY_ID].setStageOpen(); - locks.getRight(partition).signal(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - locks.getLock(partition).unlock(); - } - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - locks.getLock(partition).lock(); - if (first) { - state.status.branch[LEFT_ACTIVITY_ID].setStageData(); - first = false; - } - try { - state.partitionJoiner.setFrame(LEFT_ACTIVITY_ID, buffer); - state.partitionJoiner.processLeftFrame(writer); - } finally { - locks.getLock(partition).unlock(); - } - } - - @Override - public void fail() throws HyracksDataException { - locks.getLock(partition).lock(); - try { - state.failed = true; - } finally { - locks.getLock(partition).unlock(); - } - } - - @Override - public void close() throws HyracksDataException { - locks.getLock(partition).lock(); - try { - state.status.branch[LEFT_ACTIVITY_ID].noMore(); - if (state.failed) { - writer.fail(); - } else { - state.partitionJoiner.processLeftClose(writer); - writer.close(); - } - state.status.branch[LEFT_ACTIVITY_ID].setStageClose(); - locks.getRight(partition).signal(); - } finally { - locks.getLock(partition).unlock(); - } - } - } - } - - private class RightDataActivityNode extends AbstractActivityNode { - private static final long serialVersionUID = 1L; - - private final ActivityId joinAid; - private final MergeJoinLocks locks; - - public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { - super(id); - this.joinAid = joinAid; - this.locks = locks; - } - - @Override - public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) - throws HyracksDataException { - locks.setPartitions(nPartitions); - RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); - return new RightDataOperator(ctx, partition, inRecordDesc); - } - - private class RightDataOperator extends AbstractUnaryInputSinkOperatorNodePushable { - - private int partition; - private IHyracksTaskContext ctx; - private final RecordDescriptor rightRd; - private IntervalPartitionJoinTaskState state; - private boolean first = true; - - public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) { - this.ctx = ctx; - this.partition = partition; - this.rightRd = inRecordDesc; - } - - @Override - public void open() throws HyracksDataException { - locks.getLock(partition).lock(); - try { - do { - // Wait for the state to be set in the context form Left. - state = (IntervalPartitionJoinTaskState) ctx.getStateObject(new TaskId(joinAid, partition)); - if (state == null) { - locks.getRight(partition).await(); - } - } while (state == null); - state.k = k; - - RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx); - long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), - partition); - long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition); - ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k, - partitionStart, partitionEnd).createPartitioner(); - ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k, - partitionStart, partitionEnd).createPartitioner(); - IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx); - - state.rightRd = rightRd; - state.partitionJoiner = new IntervalPartitionJoiner(ctx, memoryForJoin, partition, state.k, - state.status, locks, imjc, state.leftRd, state.rightRd, buildHpc, probeHpc); - state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen(); - locks.getLeft(partition).signal(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - locks.getLock(partition).unlock(); - } - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - locks.getLock(partition).lock(); - if (first) { - state.status.branch[RIGHT_ACTIVITY_ID].setStageData(); - first = false; - } - try { - while (!state.status.continueRightLoad - && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) { - // Wait for the state to request right frame unless left has finished. - locks.getRight(partition).await(); - } - state.partitionJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer); - state.status.continueRightLoad = false; - locks.getLeft(partition).signal(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - locks.getLock(partition).unlock(); - } - } - - @Override - public void fail() throws HyracksDataException { - locks.getLock(partition).lock(); - try { - state.failed = true; - locks.getLeft(partition).signal(); - } finally { - locks.getLock(partition).unlock(); - } - } - - @Override - public void close() throws HyracksDataException { - locks.getLock(partition).lock(); - try { - state.status.branch[RIGHT_ACTIVITY_ID].setStageClose(); - locks.getLeft(partition).signal(); - } finally { - locks.getLock(partition).unlock(); - } - } - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java deleted file mode 100644 index e8563c2..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.operators.joins.intervalpartition2; - -import org.apache.hyracks.api.dataflow.TaskId; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.dataflow.std.join.MergeJoinTaskState; - -public class IntervalPartitionJoinTaskState extends MergeJoinTaskState { - protected IntervalPartitionJoiner partitionJoiner; - public int k; - - public IntervalPartitionJoinTaskState(JobId jobId, TaskId taskId) { - super(jobId, taskId); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java deleted file mode 100644 index fb2edd7..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.operators.joins.intervalpartition2; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.TreeMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; -import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.common.io.RunFileReader; -import org.apache.hyracks.dataflow.common.io.RunFileWriter; -import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo; -import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; -import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager; -import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner; -import org.apache.hyracks.dataflow.std.join.MergeJoinLocks; -import org.apache.hyracks.dataflow.std.join.MergeStatus; -import org.apache.hyracks.dataflow.std.structures.RunFilePointer; -import org.apache.hyracks.dataflow.std.structures.TuplePointer; - -public class IntervalPartitionJoiner extends AbstractMergeJoiner { - - private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName()); - - private final RunFileWriter probeRunFileWriter; - private int probeRunFilePid = -1; - - private final ITuplePartitionComputer buildHpc; - private final ITuplePartitionComputer probeHpc; - - private final int buildMemory; - private final int k; - private final int numOfPartitions; - private long buildSize = 0; - private long probeSize = 0; - private final TreeMap<RunFilePointer, Integer> probeRunFilePointers; - - private final VPartitionTupleBufferManager buildBufferManager; - private final TuplePointer tempPtr = new TuplePointer(); - private final List<Integer> buildInMemoryPartitions; - private final FrameTupleAccessor accessorBuild; - private BufferInfo bufferInfo; - - private long spillWriteCount = 0; - private long spillReadCount = 0; - private long joinComparisonCount = 0; - private long joinResultCount = 0; - private final IIntervalMergeJoinChecker imjc; - private final FrameTupleAccessor accessorProbe; - private final IFrame reloadBuffer; - private boolean moreBuildProcessing = true; - private final List<IFrameBufferManager> fbms = new ArrayList<>(); - - public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memorySize, int partition, int k, MergeStatus status, - MergeJoinLocks locks, IIntervalMergeJoinChecker imjc, RecordDescriptor leftRd, RecordDescriptor rightRd, - ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) throws HyracksDataException { - super(ctx, partition, status, locks, leftRd, rightRd); - - bufferInfo = new BufferInfo(null, -1, -1); - - this.accessorProbe = new FrameTupleAccessor(leftRd); - reloadBuffer = new VSizeFrame(ctx); - - this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);; - this.imjc = imjc; - - // TODO fix available memory size - this.buildMemory = memorySize; - buildBufferManager = new VPartitionTupleBufferManager(ctx, VPartitionTupleBufferManager.NO_CONSTRAIN, - numOfPartitions, buildMemory * ctx.getInitialFrameSize()); - - this.k = k; - this.buildHpc = buildHpc; - this.probeHpc = probeHpc; - - FileReference file = ctx.getJobletContext().createManagedWorkspaceFile("IntervalPartitionJoiner"); - probeRunFileWriter = new RunFileWriter(file, ctx.getIOManager()); - probeRunFileWriter.open(); - - probeRunFilePointers = new TreeMap<>(RunFilePointer.ASC); - buildInMemoryPartitions = new LinkedList<>(); - - this.accessorBuild = new FrameTupleAccessor(rightRd); - - LOGGER.setLevel(Level.FINE); - System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel()); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize - + " frames of memory."); - } - } - - @Override - public void processLeftFrame(IFrameWriter writer) throws HyracksDataException { - while (inputAccessor[LEFT_PARTITION].exists()) { - int pid = probeHpc.partition(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), k); - - if (probeRunFilePid != pid) { - // Log new partition locations. - RunFilePointer rfp = new RunFilePointer(probeRunFileWriter.getFileSize(), - inputAccessor[LEFT_PARTITION].getTupleId()); - probeRunFilePointers.put(rfp, pid); - probeRunFilePid = pid; - } - inputAccessor[LEFT_PARTITION].next(); - probeSize++; - } - inputBuffer[LEFT_PARTITION].rewind(); - probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]); - spillWriteCount++; - } - - @Override - public void processLeftClose(IFrameWriter writer) throws HyracksDataException { - joinLoopOnMemory(writer); - - // Flush result. - resultAppender.write(writer, true); - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, " - + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read."); - } - } - - private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException { - RunFileReader pReader = probeRunFileWriter.createDeleteOnCloseReader(); - pReader.open(); - // Load first frame. - loadReaderNextFrame(pReader); - - while (moreBuildProcessing) { - fillMemory(); - joinMemoryBlockWithRunFile(writer, pReader); - - // Clean up - for (int pid : buildInMemoryPartitions) { - buildBufferManager.clearPartition(pid); - } - buildInMemoryPartitions.clear(); - } - pReader.close(); - } - - private void joinMemoryBlockWithRunFile(IFrameWriter writer, RunFileReader pReader) throws HyracksDataException { - // Join Disk partitions with Memory partitions - for (RunFilePointer probeId : probeRunFilePointers.navigableKeySet()) { - Pair<Integer, Integer> probe = IntervalPartitionUtil.getIntervalPartition(probeRunFilePointers.get(probeId), - k); - for (int buildId : buildInMemoryPartitions) { - Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k); - if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) { - fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId)); - } - } - if (!fbms.isEmpty()) { - join(pReader, probeId, fbms, writer); - } - fbms.clear(); - } - } - - private void join(RunFileReader pReader, RunFilePointer rfpStart, List<IFrameBufferManager> buildFbms, - IFrameWriter writer) throws HyracksDataException { - long fileOffsetStart = rfpStart.getFileOffset(); - int tupleStart = rfpStart.getTupleIndex(); - - RunFilePointer rfpEnd = probeRunFilePointers.higherKey(rfpStart); - long fileOffsetEnd = rfpEnd == null ? pReader.getFileSize() : rfpEnd.getFileOffset(); - int tupleEnd = rfpEnd == null ? Integer.MAX_VALUE : rfpEnd.getTupleIndex(); - - if (pReader.getReadPointer() != fileOffsetStart) { - pReader.reset(fileOffsetStart); - loadReaderNextFrame(pReader); - } - do { - int start = pReader.getReadPointer() == fileOffsetStart ? tupleStart : 0; - int end = pReader.getReadPointer() == fileOffsetEnd ? tupleEnd : accessorProbe.getTupleCount(); - - for (int i = start; i < end; ++i) { - // Tuple has potential match from build phase - for (IFrameBufferManager fbm : buildFbms) { - joinTupleWithMemoryPartition(accessorProbe, i, fbm, writer); - } - } - } while (pReader.getReadPointer() < fileOffsetEnd && loadReaderNextFrame(pReader)); - } - - private boolean loadReaderNextFrame(RunFileReader pReader) throws HyracksDataException { - if (pReader.nextFrame(reloadBuffer)) { - accessorProbe.reset(reloadBuffer.getBuffer()); - spillReadCount++; - return true; - } - return false; - } - - public void joinTupleWithMemoryPartition(IFrameTupleAccessor accessorProbe, int probeTupleIndex, - IFrameBufferManager fbm, IFrameWriter writer) throws HyracksDataException { - if (fbm.getNumFrames() == 0) { - return; - } - fbm.resetIterator(); - int frameIndex = fbm.next(); - while (fbm.exists()) { - fbm.getFrame(frameIndex, bufferInfo); - accessorBuild.reset(bufferInfo.getBuffer()); - for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) { - if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) { - appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer); - } - joinComparisonCount++; - } - frameIndex = fbm.next(); - } - } - - private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe, - int probeSidetIx, IFrameWriter writer) throws HyracksDataException { - FrameUtils.appendConcatToWriter(writer, resultAppender, accessorBuild, buildSidetIx, accessorProbe, - probeSidetIx); - joinResultCount++; - } - - private void fillMemory() throws HyracksDataException { - int buildPid = -1; - TupleStatus ts; - for (ts = loadRightTuple(); ts.isLoaded(); ts = loadRightTuple()) { - int pid = buildHpc.partition(inputAccessor[RIGHT_PARTITION], inputAccessor[RIGHT_PARTITION].getTupleId(), - k); - if (!buildBufferManager.insertTuple(pid, inputAccessor[RIGHT_PARTITION], - inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) { - return; - } - - if (buildPid != pid) { - // Track new partitions in memory. - buildInMemoryPartitions.add(pid); - buildPid = pid; - } - inputAccessor[RIGHT_PARTITION].next(); - buildSize++; - } - if (ts.isEmpty()) { - moreBuildProcessing = false; - } - } - - private TupleStatus loadRightTuple() throws HyracksDataException { - TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION); - if (loaded == TupleStatus.UNKNOWN) { - loaded = pauseAndLoadRightTuple(); - } - return loaded; - } - -}