snapshot with logging
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1df9a9c7 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1df9a9c7 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1df9a9c7 Branch: refs/heads/ecarm002/interval_join_merge Commit: 1df9a9c7db4b686e75b870ee02080f8fe15ca666 Parents: 1fae6ac Author: Preston Carman <prest...@apache.org> Authored: Tue Sep 6 15:51:12 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Tue Sep 6 15:51:12 2016 -0700 ---------------------------------------------------------------------- .../joins/AbstractIntervalMergeJoinChecker.java | 6 +- .../IntervalIndexJoinOperatorDescriptor.java | 6 +- .../intervalindex/IntervalIndexJoiner.java | 86 +++++++++++++------- .../dataflow/std/join/AbstractMergeJoiner.java | 24 +++--- .../hyracks/dataflow/std/join/IMergeJoiner.java | 2 + .../std/join/MergeJoinOperatorDescriptor.java | 18 +++- .../hyracks/dataflow/std/join/MergeJoiner.java | 80 +++++++++++++----- .../dataflow/std/join/NestedLoopJoin.java | 8 +- .../dataflow/std/join/RunFileStream.java | 9 +- 9 files changed, 162 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java index b461799..ec8ecda 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java @@ -69,7 +69,7 @@ public abstract class AbstractIntervalMergeJoinChecker implements IIntervalMerge IntervalJoinUtil.getIntervalPointable(accessorRight, idRight, tvp, ipRight); ipLeft.getEnd(endLeft); ipRight.getStart(startRight); - return ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), endLeft, startRight) >= 0; + return ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), endLeft, startRight) > 0; } @Override @@ -79,7 +79,7 @@ public abstract class AbstractIntervalMergeJoinChecker implements IIntervalMerge IntervalJoinUtil.getIntervalPointable(accessorRight, idRight, tvp, ipRight); ipLeft.getStart(startLeft); ipRight.getEnd(endRight); - return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) <= 0); + return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) < 0); } @Override @@ -102,7 +102,7 @@ public abstract class AbstractIntervalMergeJoinChecker implements IIntervalMerge IntervalJoinUtil.getIntervalPointable(accessorRight, rightTupleIndex, idRight, tvp, ipRight); ipLeft.getStart(startLeft); ipRight.getEnd(endRight); - return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) <= 0); + return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) < 0); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java index be44df3..d84fabc 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java @@ -37,6 +37,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage; import org.apache.hyracks.dataflow.std.join.MergeJoinLocks; public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescriptor { @@ -191,7 +192,7 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip private static final long serialVersionUID = 1L; private final ActivityId joinAid; - private MergeJoinLocks locks; + private final MergeJoinLocks locks; public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { super(id); @@ -253,7 +254,8 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip first = false; } try { - while (!state.status.continueRightLoad && state.status.branch[LEFT_ACTIVITY_ID].hasMore()) { + while (!state.status.continueRightLoad + && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) { // Wait for the state to request right frame unless left has finished. locks.getRight(partition).await(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java index e4c4cbe..965411b 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java @@ -74,8 +74,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { private long joinComparisonCount = 0; private long joinResultCount = 0; private long spillCount = 0; - private long spillReadCount = 0; - private long spillWriteCount = 0; + private long leftSpillCount = 0; + private long rightSpillCount = 0; + private long[] spillFileCount = { 0, 0 }; + private long[] spillReadCount = { 0, 0 }; + private long[] spillWriteCount = { 0, 0 }; public IntervalIndexJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, MergeJoinLocks locks, Comparator<EndPointIndexItem> endPointComparator, @@ -109,10 +112,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { memoryAccessor[RIGHT_PARTITION] = bufferManager.getTuplePointerAccessor(rightRd); activeManager = new ActiveSweepManager[JOIN_PARTITIONS]; - activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, leftKey, LEFT_PARTITION, - endPointComparator); - activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, rightKey, RIGHT_PARTITION, - endPointComparator); + activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, leftKey, LEFT_PARTITION, endPointComparator); + activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, rightKey, RIGHT_PARTITION, endPointComparator); // Run files for both branches runFileStream = new RunFileStream[JOIN_PARTITIONS]; @@ -143,9 +144,12 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { runFileStream[LEFT_PARTITION].close(); runFileStream[RIGHT_PARTITION].close(); if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount - + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, " - + spillReadCount + " spill frames read."); + LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount + + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount() + " files, " + + runFileStream[LEFT_PARTITION].getWriteCount() + " written, " + runFileStream[LEFT_PARTITION].getReadCount() + + " read]. right[" + rightSpillCount + " spills, " + runFileStream[RIGHT_PARTITION].getFileCount() + + " files, " + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, " + + runFileStream[RIGHT_PARTITION].getReadCount() + " read]."); } } @@ -237,11 +241,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { long leftStart = IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey); long rightStart = IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey); if (leftStart < rightStart) { - return activeManager[RIGHT_PARTITION].hasRecords() - && activeManager[RIGHT_PARTITION].getTopPoint() < leftStart; + return activeManager[RIGHT_PARTITION].hasRecords() && activeManager[RIGHT_PARTITION].getTopPoint() < leftStart; } else { - return !(activeManager[LEFT_PARTITION].hasRecords() - && activeManager[LEFT_PARTITION].getTopPoint() < rightStart); + return !(activeManager[LEFT_PARTITION].hasRecords() && activeManager[LEFT_PARTITION].getTopPoint() < rightStart); } } @@ -258,8 +260,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { if (checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), sweep) || !imjc.checkToRemoveRightActive()) { // Add individual tuples. - processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], - inputAccessor[LEFT_PARTITION], true, writer); + processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], inputAccessor[LEFT_PARTITION], true, + writer); runFileStream[LEFT_PARTITION].addToRunFile(inputAccessor[LEFT_PARTITION]); inputAccessor[LEFT_PARTITION].next(); ts = loadLeftTuple(); @@ -276,7 +278,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { // Memory is empty and we can start processing the run file. if (activeManager[RIGHT_PARTITION].isEmpty() || ts.isEmpty()) { - unfreezeAndContinue(LEFT_PARTITION, inputAccessor[LEFT_PARTITION], RIGHT_PARTITION); + unfreezeAndContinue(LEFT_PARTITION, inputAccessor[LEFT_PARTITION]); ts = loadLeftTuple(); } return ts; @@ -291,8 +293,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { if (checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey), sweep) || !imjc.checkToRemoveLeftActive()) { // Add individual tuples. - processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], - inputAccessor[RIGHT_PARTITION], false, writer); + processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION], false, + writer); runFileStream[RIGHT_PARTITION].addToRunFile(inputAccessor[RIGHT_PARTITION]); inputAccessor[RIGHT_PARTITION].next(); ts = loadRightTuple(); @@ -309,7 +311,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { // Memory is empty and we can start processing the run file. if (!activeManager[LEFT_PARTITION].hasRecords() || ts.isEmpty()) { - unfreezeAndContinue(RIGHT_PARTITION, inputAccessor[RIGHT_PARTITION], LEFT_PARTITION); + unfreezeAndContinue(RIGHT_PARTITION, inputAccessor[RIGHT_PARTITION]); ts = loadRightTuple(); } return ts; @@ -318,9 +320,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { private void processLeftTuple(IFrameWriter writer) throws HyracksDataException { // Process endpoints do { - if ((!activeManager[LEFT_PARTITION].hasRecords() - || checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), - activeManager[LEFT_PARTITION].getTopPoint())) + if ((!activeManager[LEFT_PARTITION].hasRecords() || checkToProcessAdd( + IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), activeManager[LEFT_PARTITION].getTopPoint())) || !imjc.checkToRemoveLeftActive()) { // Add to active, end point index and buffer. TuplePointer tp = new TuplePointer(); @@ -340,8 +341,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { // Add Results if (!buffer.isEmpty()) { - processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, - memoryAccessor[LEFT_PARTITION], true, writer); + processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, memoryAccessor[LEFT_PARTITION], + true, writer); } } @@ -370,8 +371,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { // Add Results if (!buffer.isEmpty()) { - processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, - memoryAccessor[RIGHT_PARTITION], false, writer); + processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, memoryAccessor[RIGHT_PARTITION], + false, writer); } } @@ -417,6 +418,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { + bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: " + bufferManager.getNumTuples(RIGHT_PARTITION) + ")"); } + rightSpillCount++; } else { runFileStream[LEFT_PARTITION].startRunFile(); if (LOGGER.isLoggable(Level.FINE)) { @@ -424,7 +426,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { + bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: " + bufferManager.getNumTuples(RIGHT_PARTITION) + ")"); } + leftSpillCount++; } + spillCount++; } private void continueStream(int diskPartition, ITupleAccessor accessor) throws HyracksDataException { @@ -434,13 +438,27 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Continue with stream (" + diskPartition + ")."); } - spillCount++; - spillReadCount += runFileStream[diskPartition].getReadCount(); - spillWriteCount += runFileStream[diskPartition].getWriteCount(); } - private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor, int flushPartition) - throws HyracksDataException { + private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor) throws HyracksDataException { + int flushPartition = frozenPartition == LEFT_PARTITION ? RIGHT_PARTITION : LEFT_PARTITION; + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("snapshot(" + frozenPartition + "): " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION] + + " left, left[" + leftSpillCount + " spills, " + + (runFileStream[LEFT_PARTITION].getFileCount() - spillFileCount[LEFT_PARTITION]) + " files, " + + (runFileStream[LEFT_PARTITION].getWriteCount() - spillWriteCount[LEFT_PARTITION]) + " written, " + + (runFileStream[LEFT_PARTITION].getReadCount() - spillReadCount[LEFT_PARTITION]) + " read]. right[" + rightSpillCount + + " spills, " + (runFileStream[RIGHT_PARTITION].getFileCount() - spillFileCount[RIGHT_PARTITION]) + " files, " + + (runFileStream[RIGHT_PARTITION].getWriteCount() - spillWriteCount[RIGHT_PARTITION]) + " written, " + + (runFileStream[RIGHT_PARTITION].getReadCount() - spillReadCount[RIGHT_PARTITION]) + " read]."); + spillFileCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getFileCount(); + spillReadCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getReadCount(); + spillWriteCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getWriteCount(); + spillFileCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getFileCount(); + spillReadCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getReadCount(); + spillWriteCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getWriteCount(); + } + runFileStream[frozenPartition].flushAndStopRunFile(accessor); flushMemory(flushPartition); if ((LEFT_PARTITION == frozenPartition && !status.branch[LEFT_PARTITION].isRunFileReading()) @@ -453,4 +471,10 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } } + @Override + public void closeInput(int partition) throws HyracksDataException { + // TODO Auto-generated method stub + + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java index 8006790..aa065cd 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java @@ -60,6 +60,7 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner { private final int partition; private final MergeJoinLocks locks; + protected long[] frameCounts = { 0, 0 }; public AbstractMergeJoiner(IHyracksTaskContext ctx, int partition, MergeStatus status, MergeJoinLocks locks, RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException { @@ -98,12 +99,13 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner { return TupleStatus.LOADED; } - protected TupleStatus loadMemoryTuple(int joinId) { + protected TupleStatus loadMemoryTuple(int branch) { TupleStatus loaded; - if (inputAccessor[joinId] != null && inputAccessor[joinId].exists()) { + if (inputAccessor[branch] != null && inputAccessor[branch].exists()) { // Still processing frame. + int test = inputAccessor[branch].getTupleCount(); loaded = TupleStatus.LOADED; - } else if (status.branch[joinId].hasMore()) { + } else if (status.branch[branch].hasMore()) { loaded = TupleStatus.UNKNOWN; } else { // No more frames or tuples to process. @@ -113,14 +115,14 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner { } @Override - public void setFrame(int partition, ByteBuffer buffer) { - inputBuffer[partition].clear(); - if (inputBuffer[partition].capacity() < buffer.capacity()) { - inputBuffer[partition].limit(buffer.capacity()); + public void setFrame(int branch, ByteBuffer buffer) { + inputBuffer[branch].clear(); + if (inputBuffer[branch].capacity() < buffer.capacity()) { + inputBuffer[branch].limit(buffer.capacity()); } - inputBuffer[partition].put(buffer.array(), 0, buffer.capacity()); - inputAccessor[partition].reset(inputBuffer[partition]); - inputAccessor[partition].next(); + inputBuffer[branch].put(buffer.array(), 0, buffer.capacity()); + inputAccessor[branch].reset(inputBuffer[branch]); + inputAccessor[branch].next(); + frameCounts[branch]++; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java index 61ddde1..4268ec9 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java @@ -31,4 +31,6 @@ public interface IMergeJoiner { void setFrame(int partition, ByteBuffer buffer); + void closeInput(int partition) throws HyracksDataException; + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java index 5624bb5..cbe1a66 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java @@ -34,6 +34,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage; /** * The merge join is made up of two operators: left and right. @@ -105,6 +106,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { private final RecordDescriptor leftRd; private MergeJoinTaskState state; private boolean first = true; + int count = 0; public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) { this.ctx = ctx; @@ -141,6 +143,8 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { locks.getLock(partition).lock(); + + count++; if (first) { state.status.branch[LEFT_ACTIVITY_ID].setStageData(); first = false; @@ -171,6 +175,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { if (state.failed) { writer.fail(); } else { + state.joiner.closeInput(LEFT_ACTIVITY_ID); state.joiner.processMergeUsingLeftTuple(writer); state.joiner.closeResult(writer); writer.close(); @@ -180,6 +185,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { } finally { locks.getLock(partition).unlock(); } +// System.err.println("Left next calls: " + count); } } } @@ -188,7 +194,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { private static final long serialVersionUID = 1L; private final ActivityId joinAid; - private MergeJoinLocks locks; + private final MergeJoinLocks locks; public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { super(id); @@ -202,8 +208,8 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { throws HyracksDataException { locks.setPartitions(nPartitions); RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); - final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys, - partition, ctx); + final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys, partition, + ctx); return new RightDataOperator(ctx, partition, inRecordDesc, mjc); } @@ -215,6 +221,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { private final IMergeJoinChecker mjc; private MergeJoinTaskState state; private boolean first = true; + int count = 0; public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc, IMergeJoinChecker mjc) { @@ -250,12 +257,14 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { locks.getLock(partition).lock(); + count++; if (first) { state.status.branch[RIGHT_ACTIVITY_ID].setStageData(); first = false; } try { - while (!state.status.continueRightLoad && state.status.branch[LEFT_ACTIVITY_ID].hasMore()) { + while (!state.status.continueRightLoad + && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) { // Wait for the state to request right frame unless left has finished. locks.getRight(partition).await(); } @@ -289,6 +298,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { } finally { locks.getLock(partition).unlock(); } +// System.err.println("Right next calls: " + count); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java index 03283d3..c1a828f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java @@ -60,8 +60,9 @@ public class MergeJoiner extends AbstractMergeJoiner { private long joinComparisonCount = 0; private long joinResultCount = 0; - private long spillWriteCount = 0; - private long spillReadCount = 0; +// private long spillFileCount = 0; +// private long spillWriteCount = 0; +// private long spillReadCount = 0; private long spillCount = 0; public MergeJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, MergeJoinLocks locks, @@ -114,12 +115,14 @@ public class MergeJoiner extends AbstractMergeJoiner { resultAppender.write(writer, true); if (LOGGER.isLoggable(Level.WARNING)) { LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount - + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, " - + spillReadCount + " spill frames read."); + + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, " + + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount() + + " spill frames read."); } } private void flushMemory() throws HyracksDataException { + memoryBuffer.clear(); bufferManager.reset(); } @@ -151,7 +154,11 @@ public class MergeJoiner extends AbstractMergeJoiner { if (status.branch[LEFT_PARTITION].isRunFileReading()) { loaded = loadSpilledTuple(LEFT_PARTITION); if (loaded.isEmpty()) { - continueStream(inputAccessor[LEFT_PARTITION]); + if (status.branch[LEFT_PARTITION].isRunFileWriting() && !status.branch[LEFT_PARTITION].hasMore()) { + unfreezeAndContinue(inputAccessor[LEFT_PARTITION]); + } else { + continueStream(inputAccessor[LEFT_PARTITION]); + } loaded = loadLeftTuple(); } } else { @@ -169,6 +176,13 @@ public class MergeJoiner extends AbstractMergeJoiner { return TupleStatus.LOADED; } + @Override + public void closeInput(int partition) throws HyracksDataException { + if (status.branch[partition].isRunFileWriting()) { + unfreezeAndContinue(inputAccessor[partition]); + } + } + /** * Left * @@ -176,48 +190,54 @@ public class MergeJoiner extends AbstractMergeJoiner { */ @Override public void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException { - TupleStatus ts = loadLeftTuple(); - while (ts.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() || memoryHasTuples())) { + TupleStatus leftTs = loadLeftTuple(); + TupleStatus rightTs = loadRightTuple(); + while (leftTs.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() || memoryHasTuples())) { if (status.branch[LEFT_PARTITION].isRunFileWriting()) { // Left side from disk - processLeftTupleSpill(writer); - ts = loadLeftTuple(); - } else if (loadRightTuple().isLoaded() - && mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION])) { + leftTs = processLeftTupleSpill(writer); + } else if (rightTs.isLoaded() && mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION])) { // Right side from stream processRightTuple(); + rightTs = loadRightTuple(); } else { // Left side from stream processLeftTuple(writer); - ts = loadLeftTuple(); + leftTs = loadLeftTuple(); } } } - private void processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException { + private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException { + // System.err.print("Spill "); + runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]); processLeftTuple(writer); + // Memory is empty and we can start processing the run file. if (!memoryHasTuples() && status.branch[LEFT_PARTITION].isRunFileWriting()) { unfreezeAndContinue(inputAccessor[LEFT_PARTITION]); } + return loadLeftTuple(); } private void processLeftTuple(IFrameWriter writer) throws HyracksDataException { + // TuplePrinterUtil.printTuple("Left", inputAccessor[LEFT]); // Check against memory (right) if (memoryHasTuples()) { for (int i = memoryBuffer.size() - 1; i > -1; --i) { memoryAccessor.reset(memoryBuffer.get(i)); - if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), - memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) { + if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor, + memoryBuffer.get(i).getTupleIndex(), false)) { // add to result - addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), - memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer); + addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor, + memoryBuffer.get(i).getTupleIndex(), writer); } joinComparisonCount++; - if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), - memoryAccessor, memoryBuffer.get(i).getTupleIndex())) { + if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor, + memoryBuffer.get(i).getTupleIndex())) { // remove from memory + // TuplePrinterUtil.printTuple("Remove Memory", memoryAccessor, memoryBuffer.get(i).getTupleIndex()); removeFromMemory(memoryBuffer.get(i)); } } @@ -234,30 +254,45 @@ public class MergeJoiner extends AbstractMergeJoiner { return; } } + // TuplePrinterUtil.printTuple("Memory", inputAccessor[RIGHT]); inputAccessor[RIGHT_PARTITION].next(); } private void freezeAndSpill() throws HyracksDataException { +// System.err.println("freezeAndSpill"); + runFileStream.startRunFile(); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine( "Memory is full. Freezing the right branch. (memory tuples: " + bufferManager.getNumTuples() + ")"); } + spillCount++; } private void continueStream(ITupleAccessor accessor) throws HyracksDataException { +// System.err.println("continueStream"); + runFileStream.closeRunFile(); accessor.reset(inputBuffer[LEFT_PARTITION]); accessor.setTupleId(leftStreamIndex); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Continue with left stream."); } - spillCount++; - spillReadCount += runFileStream.getReadCount(); - spillWriteCount += runFileStream.getWriteCount(); } private void unfreezeAndContinue(ITupleAccessor accessor) throws HyracksDataException { +// System.err.println("unfreezeAndContinue"); +// if (LOGGER.isLoggable(Level.WARNING)) { +// LOGGER.warning("snapshot: " + frameCounts[RIGHT] + " right, " + frameCounts[LEFT] + " left, " +// + joinComparisonCount + " comparisons, " + joinResultCount + " results, " + spillCount + " spills, " +// + (runFileStream.getFileCount() - spillFileCount) + " files, " +// + (runFileStream.getWriteCount() - spillWriteCount) + " spill frames written, " +// + (runFileStream.getReadCount() - spillReadCount) + " spill frames read."); +// spillFileCount = runFileStream.getFileCount(); +// spillReadCount = runFileStream.getReadCount(); +// spillWriteCount = runFileStream.getWriteCount(); +// } + runFileStream.flushAndStopRunFile(accessor); flushMemory(); if (!status.branch[LEFT_PARTITION].isRunFileReading()) { @@ -267,6 +302,7 @@ public class MergeJoiner extends AbstractMergeJoiner { if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Unfreezing right partition."); } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java index 3d99d6c..5cc36ce 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java @@ -62,6 +62,7 @@ public class NestedLoopJoin { private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal private BufferInfo tempInfo = new BufferInfo(null, -1, -1); + private final int partition; private long joinComparisonCount = 0; private long joinResultCount = 0; private long spillWriteCount = 0; @@ -104,6 +105,8 @@ public class NestedLoopJoin { .createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString()); runFileWriter = new RunFileWriter(file, ctx.getIOManager()); runFileWriter.open(); + + partition = ctx.getTaskAttemptId().getTaskId().getPartition(); } public void cache(ByteBuffer buffer) throws HyracksDataException { @@ -202,8 +205,9 @@ public class NestedLoopJoin { appender.write(writer, true); if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("NestedLoopJoin statitics: " + joinComparisonCount + " comparisons, " + joinResultCount - + " results, " + spillWriteCount + " frames written, " + spillReadCount + " frames read."); + LOGGER.warning("NestedLoopJoin statitics: " + partition + " partition, " + joinComparisonCount + + " comparisons, " + joinResultCount + " results, " + spillWriteCount + " frames written, " + + spillReadCount + " frames read."); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java index aaaaaf4..042b85e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java @@ -48,6 +48,7 @@ public class RunFileStream { private long runFileCounter = 0; private long readCount = 0; private long writeCount = 0; + private long tupleCount = 0; public RunFileStream(IHyracksTaskContext ctx, String key, IRunFileStreamStatus status) throws HyracksDataException { this.ctx = ctx; @@ -58,6 +59,10 @@ public class RunFileStream { runFileAppender = new FrameTupleAppender(new VSizeFrame(ctx)); } + public long getFileCount() { + return runFileCounter; + } + public long getReadCount() { return readCount; } @@ -67,8 +72,6 @@ public class RunFileStream { } public void startRunFile() throws HyracksDataException { - readCount = 0; - writeCount = 0; runFileCounter++; status.setRunFileWriting(true); @@ -89,7 +92,9 @@ public class RunFileStream { runFileAppender.write(runFileWriter, true); writeCount++; runFileAppender.append(accessor, idx); + tupleCount = 0; } + tupleCount++; } public void openRunFile(ITupleAccessor accessor) throws HyracksDataException {