Shiva Jahangiri has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/3421
Change subject: Refactoring OptimizedHybrishHashJoin Introduced RunfilesManager
and PartitionsManager
......................................................................
Refactoring OptimizedHybrishHashJoin
Introduced RunfilesManager and PartitionsManager
Change-Id: I3f6d011f8af256b290cc28a04a412bcbd005920a
---
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
A
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
A
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinRunFilesManager.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
6 files changed, 334 insertions(+), 206 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/21/3421/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index 6c08be2..a435919 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -113,4 +113,12 @@
*/
void clearPartition(int partition) throws HyracksDataException;
+ /**
+ * Flushes the particular partition {@code pid} to {@code writer} and
+ * clears it.
+ * @param pid
+ * @throws HyracksDataException
+ */
+ void flushAndClearPartition(int pid, IFrameWriter writer) throws
HyracksDataException;
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4578c2e..a11eab5 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -125,6 +125,12 @@
}
@Override
+ public void flushAndClearPartition(int pid, IFrameWriter writer) throws
HyracksDataException {
+ flushPartition(pid, writer);
+ clearPartition(pid);
+ }
+
+ @Override
public boolean insertTuple(int partition, byte[] byteArray, int[]
fieldEndOffsets, int start, int size,
TuplePointer pointer) throws HyracksDataException {
int actualSize = calculateActualSize(fieldEndOffsets, size);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
new file mode 100644
index 0000000..f05f986
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HashJoinPartitionsManager {
+ private int[] buildPSizeInTups;
+ private int[] probePSizeInTups;
+ private final BitSet spilledStatus; //0=resident, 1=spilled
+
+ HashJoinPartitionsManager(int numberOfPartitions) {
+ this.buildPSizeInTups = new int[numberOfPartitions];
+ this.probePSizeInTups = new int[numberOfPartitions];
+ this.spilledStatus = new BitSet(numberOfPartitions);
+ }
+
+ public BitSet getSpilledStatus() {
+ return spilledStatus;
+ }
+
+ public void setPartitionAsSpilled(int pid) {
+ this.spilledStatus.set(pid);
+ }
+
+ public int getBuildPartitionSizeInTup(int pid) {
+ return (buildPSizeInTups[pid]);
+ }
+
+ public int getProbePartitionSizeInTup(int pid) {
+ return (probePSizeInTups[pid]);
+ }
+
+ public boolean isBuildRelAllInMemory() {
+ return spilledStatus.nextSetBit(0) < 0;
+ }
+
+ public int getMaxBuildPartitionSize() {
+ return Arrays.stream(buildPSizeInTups).max().getAsInt();
+ }
+
+ public int getMaxProbePartitionSize() {
+ return Arrays.stream(probePSizeInTups).max().getAsInt();
+ }
+
+ public void incrementNumOfTuplesByOne(int pid,
OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+ switch (whichSide) {
+ case BUILD:
+ buildPSizeInTups[pid]++;
+ break;
+ case PROBE:
+ probePSizeInTups[pid]++;
+ break;
+ default:
+ throw new HyracksDataException("Side has to be either Build or
Probe.");
+ }
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinRunFilesManager.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinRunFilesManager.java
new file mode 100644
index 0000000..9841737
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinRunFilesManager.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+
+public class HashJoinRunFilesManager {
+ private IHyracksTaskContext ctx;
+ private final String buildRelName;
+ private final String probeRelName;
+ private RunFileWriter[] buildRFWriters; //writing spilled build partitions
+
+ private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
+
+ HashJoinRunFilesManager(int numberOfPartitions, IHyracksTaskContext ctx,
String buildRelName, String probeRelName) {
+ this.ctx = ctx;
+ this.buildRelName = buildRelName;
+ this.probeRelName = probeRelName;
+ this.buildRFWriters = new RunFileWriter[numberOfPartitions];
+ this.probeRFWriters = new RunFileWriter[numberOfPartitions];
+ }
+
+ /**
+ * In case of failure happens, we need to clear up the generated temporary
files.
+ */
+ public void clearProbeTempFiles() throws HyracksDataException {
+ for (int i = 0; i < probeRFWriters.length; i++) {
+ if (probeRFWriters[i] != null) {
+ probeRFWriters[i].erase();
+ }
+ }
+ }
+
+ public RunFileReader getBuildRFReader(int pid) throws HyracksDataException
{
+ return ((buildRFWriters[pid] == null) ? null :
(buildRFWriters[pid]).createDeleteOnCloseReader());
+ }
+
+ public RunFileReader getProbeRFReader(int pid) throws HyracksDataException
{
+ return ((probeRFWriters[pid] == null) ? null :
(probeRFWriters[pid]).createDeleteOnCloseReader());
+ }
+
+ private void closeBuildPartition(int pid) throws HyracksDataException {
+ if (buildRFWriters[pid] == null) {
+ throw new HyracksDataException("Tried to close the non-existing
file writer.");
+ }
+ buildRFWriters[pid].close();
+ }
+
+ public void setRunFileWriter(int pid, OptimizedHybridHashJoin.SIDE
whichSide, RunFileWriter rfw) {
+ RunFileWriter runFileWriter = null;
+ switch (whichSide) {
+ case BUILD:
+ buildRFWriters[pid] = rfw;
+ break;
+ case PROBE:
+ probeRFWriters[pid] = rfw;
+ break;
+ }
+ }
+
+ public RunFileWriter getSpillWriterOrCreateIfNotExist(int pid,
OptimizedHybridHashJoin.SIDE whichSide)
+ throws HyracksDataException {
+ RunFileWriter[] runFileWriters = null;
+ String refName = null;
+ switch (whichSide) {
+ case BUILD:
+ runFileWriters = buildRFWriters;
+ refName = buildRelName;
+ break;
+ case PROBE:
+ refName = probeRelName;
+ runFileWriters = probeRFWriters;
+ break;
+ }
+ RunFileWriter writer = runFileWriters[pid];
+ if (writer == null) {
+ FileReference file =
ctx.getJobletContext().createManagedWorkspaceFile(refName);
+ writer = new RunFileWriter(file, ctx.getIoManager());
+ writer.open();
+ runFileWriters[pid] = writer;
+ }
+ return writer;
+ }
+
+ /**
+ * In case of failure happens, we need to clear up the generated temporary
files.
+ */
+ public void clearBuildTempFiles() throws HyracksDataException {
+ for (int i = 0; i < buildRFWriters.length; i++) {
+ if (buildRFWriters[i] != null) {
+ buildRFWriters[i].erase();
+ }
+ }
+ }
+
+ public void closeAllSpilledPartitions(OptimizedHybridHashJoin.SIDE
whichSide) throws HyracksDataException {
+ RunFileWriter[] runFileWriters = null;
+ switch (whichSide) {
+ case BUILD:
+ runFileWriters = buildRFWriters;
+ break;
+ case PROBE:
+ runFileWriters = probeRFWriters;
+ break;
+ }
+
+ if (runFileWriters != null) {
+ for (RunFileWriter runFileWriter : runFileWriters) {
+ if (runFileWriter != null) {
+ runFileWriter.close();
+ }
+ }
+ }
+ }
+
+ public void closeSpilledPartition(int pid, OptimizedHybridHashJoin.SIDE
whichSide) throws HyracksDataException {
+ RunFileWriter runFileWriter = null;
+ switch (whichSide) {
+ case BUILD:
+ runFileWriter = buildRFWriters[pid];
+ break;
+ case PROBE:
+ runFileWriter = probeRFWriters[pid];
+ break;
+ }
+ if (runFileWriter != null) {
+ runFileWriter.close();
+ }
+ }
+
+ public RunFileWriter[] getBuildRFWriters() {
+ return buildRFWriters;
+ }
+
+ public RunFileWriter[] getProbeRFWriters() {
+ return probeRFWriters;
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c78e0dc..bb27fdd 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -32,7 +32,6 @@
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
@@ -56,6 +55,8 @@
// Used for special probe BigObject which can not be held into the Join
memory
private FrameTupleAppender bigProbeFrameAppender;
+ private HashJoinPartitionsManager partitionManager;
+ private HashJoinRunFilesManager runFilesManager;
public enum SIDE {
BUILD,
@@ -74,14 +75,10 @@
private final RecordDescriptor buildRd;
private final RecordDescriptor probeRd;
- private RunFileWriter[] buildRFWriters; //writing spilled build partitions
- private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
private final IPredicateEvaluator predEvaluator;
private final boolean isLeftOuter;
private final IMissingWriter[] nonMatchWriters;
- private final BitSet spilledStatus; //0=resident, 1=spilled
private final int numOfPartitions;
private final int memSizeInFrames;
private InMemoryHashJoin inMemJoiner; //Used for joining resident
partitions
@@ -98,16 +95,17 @@
private boolean isReversed; //Added for handling correct calling for
predicate-evaluator upon recursive calls that cause role-reversal
// stats information
- private int[] buildPSizeInTups;
+
private IFrame reloadBuffer;
private TuplePointer tempPtr = new TuplePointer(); // this is a reusable
object to store the pointer,which is not used anywhere.
// we mainly use it to
match the corresponding function signature.
- private int[] probePSizeInTups;
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int
memSizeInFrames, int numOfPartitions,
String probeRelName, String buildRelName, ITuplePairComparator
comparator, RecordDescriptor probeRd,
RecordDescriptor buildRd, ITuplePartitionComputer probeHpc,
ITuplePartitionComputer buildHpc,
IPredicateEvaluator predEval, boolean isLeftOuter,
IMissingWriterFactory[] nullWriterFactories1) {
+ this.partitionManager = new HashJoinPartitionsManager(numOfPartitions);
+ this.runFilesManager = new HashJoinRunFilesManager(numOfPartitions,
ctx, buildRelName, probeRelName);
this.ctx = ctx;
this.memSizeInFrames = memSizeInFrames;
this.buildRd = buildRd;
@@ -117,10 +115,7 @@
this.comparator = comparator;
this.buildRelName = buildRelName;
this.probeRelName = probeRelName;
-
this.numOfPartitions = numOfPartitions;
- this.buildRFWriters = new RunFileWriter[numOfPartitions];
- this.probeRFWriters = new RunFileWriter[numOfPartitions];
this.accessorBuild = new FrameTupleAccessor(buildRd);
this.accessorProbe = new FrameTupleAccessor(probeRd);
@@ -128,8 +123,6 @@
this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
this.isReversed = false;
-
- this.spilledStatus = new BitSet(numOfPartitions);
this.nonMatchWriters = isLeftOuter ? new
IMissingWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
@@ -142,12 +135,10 @@
public void initBuild() throws HyracksDataException {
framePool = new DeallocatableFramePool(ctx, memSizeInFrames *
ctx.getInitialFrameSize());
bufferManagerForHashTable = new
FramePoolBackedFrameBufferManager(framePool);
- bufferManager = new VPartitionTupleBufferManager(
-
PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
- numOfPartitions, framePool);
- spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager,
spilledStatus);
- spilledStatus.clear();
- buildPSizeInTups = new int[numOfPartitions];
+ bufferManager = new
VPartitionTupleBufferManager(PreferToSpillFullyOccupiedFramePolicy
+
.createAtMostOneFrameForSpilledPartitionConstrain(partitionManager.getSpilledStatus()),
numOfPartitions,
+ framePool);
+ spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager,
partitionManager.getSpilledStatus());
}
public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -157,15 +148,14 @@
for (int i = 0; i < tupleCount; ++i) {
int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
processTuple(i, pid);
- buildPSizeInTups[pid]++;
}
-
}
private void processTuple(int tid, int pid) throws HyracksDataException {
while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
selectAndSpillVictim(pid);
}
+ partitionManager.incrementNumOfTuplesByOne(pid, SIDE.BUILD);
}
private void selectAndSpillVictim(int pid) throws HyracksDataException {
@@ -178,40 +168,8 @@
}
private void spillPartition(int pid) throws HyracksDataException {
- RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid,
SIDE.BUILD);
- bufferManager.flushPartition(pid, writer);
- bufferManager.clearPartition(pid);
- spilledStatus.set(pid);
- }
-
- private void closeBuildPartition(int pid) throws HyracksDataException {
- if (buildRFWriters[pid] == null) {
- throw new HyracksDataException("Tried to close the non-existing
file writer.");
- }
- buildRFWriters[pid].close();
- }
-
- private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE
whichSide) throws HyracksDataException {
- RunFileWriter[] runFileWriters = null;
- String refName = null;
- switch (whichSide) {
- case BUILD:
- runFileWriters = buildRFWriters;
- refName = buildRelName;
- break;
- case PROBE:
- refName = probeRelName;
- runFileWriters = probeRFWriters;
- break;
- }
- RunFileWriter writer = runFileWriters[pid];
- if (writer == null) {
- FileReference file =
ctx.getJobletContext().createManagedWorkspaceFile(refName);
- writer = new RunFileWriter(file, ctx.getIoManager());
- writer.open();
- runFileWriters[pid] = writer;
- }
- return writer;
+ bufferManager.flushAndClearPartition(pid,
runFilesManager.getSpillWriterOrCreateIfNotExist(pid, SIDE.BUILD));
+ partitionManager.setPartitionAsSpilled(pid);
}
public void closeBuild() throws HyracksDataException {
@@ -229,47 +187,6 @@
}
/**
- * In case of failure happens, we need to clear up the generated temporary
files.
- */
- public void clearBuildTempFiles() throws HyracksDataException {
- for (int i = 0; i < buildRFWriters.length; i++) {
- if (buildRFWriters[i] != null) {
- buildRFWriters[i].erase();
- }
- }
- }
-
- private void closeAllSpilledPartitions(SIDE whichSide) throws
HyracksDataException {
- RunFileWriter[] runFileWriters = null;
- switch (whichSide) {
- case BUILD:
- runFileWriters = buildRFWriters;
- break;
- case PROBE:
- runFileWriters = probeRFWriters;
- break;
- }
- try {
- for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid <
numOfPartitions; pid =
- spilledStatus.nextSetBit(pid + 1)) {
- if (bufferManager.getNumTuples(pid) > 0) {
- bufferManager.flushPartition(pid,
getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
- bufferManager.clearPartition(pid);
- }
- }
- } finally {
- // Force to close all run file writers.
- if (runFileWriters != null) {
- for (RunFileWriter runFileWriter : runFileWriters) {
- if (runFileWriter != null) {
- runFileWriter.close();
- }
- }
- }
- }
- }
-
- /**
* Makes the space for the hash table. If there is no enough space, one or
more partitions will be spilled
* to the disk until the hash table can fit into the memory. After this,
bring back spilled partitions
* if there is available memory.
@@ -280,6 +197,7 @@
private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws
HyracksDataException {
// we need number of |spilledPartitions| buffers to store the probe
data
int frameSize = ctx.getInitialFrameSize();
+ BitSet spilledStatus = partitionManager.getSpilledStatus();
long freeSpace = (long) (memSizeInFrames -
spilledStatus.cardinality()) * frameSize;
// For partitions in main memory, we deduct their size from the free
space.
@@ -287,7 +205,7 @@
for (int p = spilledStatus.nextClearBit(0); p >= 0 && p <
numOfPartitions; p =
spilledStatus.nextClearBit(p + 1)) {
freeSpace -= bufferManager.getPhysicalSize(p);
- inMemTupCount += buildPSizeInTups[p];
+ inMemTupCount += partitionManager.getBuildPartitionSizeInTup(p);
}
// Calculates the expected hash table size for the given number of
tuples in main memory
@@ -309,11 +227,11 @@
// There is a suitable one. We spill that partition to the
disk.
long hashTableSizeDecrease =
-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
- -buildPSizeInTups[pidToSpill], frameSize);
+
-partitionManager.getBuildPartitionSizeInTup(pidToSpill), frameSize);
freeSpace = freeSpace +
bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
- inMemTupCount -= buildPSizeInTups[pidToSpill];
+ inMemTupCount -=
partitionManager.getBuildPartitionSizeInTup(pidToSpill);
spillPartition(pidToSpill);
- closeBuildPartition(pidToSpill);
+ runFilesManager.closeSpilledPartition(pidToSpill, SIDE.BUILD);
moreSpilled = true;
} else {
// There is no single suitable partition. So, we need to spill
multiple partitions to the disk
@@ -321,12 +239,12 @@
for (int p = spilledStatus.nextClearBit(0); p >= 0 && p <
numOfPartitions; p =
spilledStatus.nextClearBit(p + 1)) {
int spaceToBeReturned = bufferManager.getPhysicalSize(p);
- int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
+ int numberOfTuplesToBeSpilled =
partitionManager.getBuildPartitionSizeInTup(p);
if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled ==
0) {
continue;
}
spillPartition(p);
- closeBuildPartition(p);
+ runFilesManager.closeSpilledPartition(p, SIDE.BUILD);
moreSpilled = true;
// Since the number of tuples in memory has been decreased,
// the hash table size will be decreased, too.
@@ -353,13 +271,13 @@
// Brings back some partitions if there is enough free space.
int pid = 0;
while ((pid = selectPartitionsToReload(freeSpace, pid, inMemTupCount))
>= 0) {
- if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
+ if (!loadSpilledPartitionToMem(pid,
runFilesManager.getBuildRFWriters()[pid])) {
break;
}
- long expectedHashTableByteSizeIncrease = SerializableHashTable
- .calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
buildPSizeInTups[pid], frameSize);
+ long expectedHashTableByteSizeIncrease =
SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
+ inMemTupCount,
partitionManager.getBuildPartitionSizeInTup(pid), frameSize);
freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) -
expectedHashTableByteSizeIncrease;
- inMemTupCount += buildPSizeInTups[pid];
+ inMemTupCount += partitionManager.getBuildPartitionSizeInTup(pid);
// Adjusts the hash table size
hashTableByteSizeForInMemTuples +=
expectedHashTableByteSizeIncrease;
}
@@ -377,14 +295,16 @@
long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
int minSpaceAfterSpillPartID = -1;
+ BitSet spilledStatus = partitionManager.getSpilledStatus();
for (int p = spilledStatus.nextClearBit(0); p >= 0 && p <
numOfPartitions; p =
spilledStatus.nextClearBit(p + 1)) {
- if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p)
== 0) {
+ if (partitionManager.getBuildPartitionSizeInTup(p) == 0 ||
bufferManager.getPhysicalSize(p) == 0) {
continue;
}
// We put minus since the method returns a negative value to
represent a newly reclaimed space.
- spaceAfterSpill = currentFreeSpace +
bufferManager.getPhysicalSize(p) + (-SerializableHashTable
-
.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount,
-buildPSizeInTups[p], frameSize));
+ spaceAfterSpill = currentFreeSpace +
bufferManager.getPhysicalSize(p)
+ +
(-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount,
+ -partitionManager.getBuildPartitionSizeInTup(p),
frameSize));
if (spaceAfterSpill == 0) {
// Found the perfect one. Just returns this partition.
return p;
@@ -398,13 +318,14 @@
}
private int selectPartitionsToReload(long freeSpace, int pid, int
inMemTupCount) {
+ BitSet spilledStatus = partitionManager.getSpilledStatus();
for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i <
numOfPartitions; i =
spilledStatus.nextSetBit(i + 1)) {
- int spilledTupleCount = buildPSizeInTups[i];
+ int spilledTupleCount =
partitionManager.getBuildPartitionSizeInTup(i);
// Expected hash table size increase after reloading this partition
long expectedHashTableByteSizeIncrease =
SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
inMemTupCount, spilledTupleCount,
ctx.getInitialFrameSize());
- if (freeSpace >= buildRFWriters[i].getFileSize() +
expectedHashTableByteSizeIncrease) {
+ if (freeSpace >=
runFilesManager.getBuildRFWriters()[i].getFileSize() +
expectedHashTableByteSizeIncrease) {
return i;
}
}
@@ -435,8 +356,8 @@
} finally {
r.close();
}
- spilledStatus.set(pid, false);
- buildRFWriters[pid] = null;
+ partitionManager.getSpilledStatus().set(pid, false);
+ runFilesManager.setRunFileWriter(pid, SIDE.BUILD, null);
return true;
}
@@ -450,7 +371,7 @@
private void loadDataInMemJoin() throws HyracksDataException {
for (int pid = 0; pid < numOfPartitions; pid++) {
- if (!spilledStatus.get(pid)) {
+ if (!partitionManager.getSpilledStatus().get(pid)) {
bufferManager.flushPartition(pid, new IFrameWriter() {
@Override
public void open() throws HyracksDataException {
@@ -476,18 +397,11 @@
}
}
- public void initProbe() throws HyracksDataException {
-
- probePSizeInTups = new int[numOfPartitions];
- probeRFWriters = new RunFileWriter[numOfPartitions];
-
- }
-
public void probe(ByteBuffer buffer, IFrameWriter writer) throws
HyracksDataException {
accessorProbe.reset(buffer);
int tupleCount = accessorProbe.getTupleCount();
- if (isBuildRelAllInMemory()) {
+ if (partitionManager.isBuildRelAllInMemory()) {
inMemJoiner.join(buffer, writer);
return;
}
@@ -495,8 +409,8 @@
for (int i = 0; i < tupleCount; ++i) {
int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
- if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has
potential match from previous phase
- if (spilledStatus.get(pid)) { //pid is Spilled
+ if (partitionManager.getBuildPartitionSizeInTup(pid) > 0 ||
isLeftOuter) { //Tuple has potential match from previous phase
+ if (partitionManager.getSpilledStatus().get(pid)) { //pid is
Spilled
while (!bufferManager.insertTuple(pid, accessorProbe, i,
tempPtr)) {
int victim = pid;
if (bufferManager.getNumTuples(pid) == 0) { // current
pid is empty, choose the biggest one
@@ -506,38 +420,46 @@
flushBigProbeObjectToDisk(pid, accessorProbe, i);
break;
}
- RunFileWriter runFileWriter =
getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
+ RunFileWriter runFileWriter =
+
runFilesManager.getSpillWriterOrCreateIfNotExist(victim, SIDE.PROBE);
bufferManager.flushPartition(victim, runFileWriter);
bufferManager.clearPartition(victim);
}
} else { //pid is Resident
inMemJoiner.join(i, writer);
}
- probePSizeInTups[pid]++;
+ partitionManager.incrementNumOfTuplesByOne(pid, SIDE.PROBE);
}
}
}
private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor
accessorProbe, int i)
throws HyracksDataException {
+ //TODO: The variable size frame is not using the join memory budget
which can lead to using more memory than
+ // memory budegt. Needs to get fixed in a way to either flush it right
away without using a frame, or allocate
+ // frames to it by spilling some partitions if possible.
if (bigProbeFrameAppender == null) {
bigProbeFrameAppender = new FrameTupleAppender(new
VSizeFrame(ctx));
}
- RunFileWriter runFileWriter =
getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+ RunFileWriter runFileWriter =
runFilesManager.getSpillWriterOrCreateIfNotExist(pid, SIDE.PROBE);
if (!bigProbeFrameAppender.append(accessorProbe, i)) {
throw new HyracksDataException("The given tuple is too big");
}
bigProbeFrameAppender.write(runFileWriter, true);
}
- private boolean isBuildRelAllInMemory() {
- return spilledStatus.nextSetBit(0) < 0;
- }
-
public void completeProbe(IFrameWriter writer) throws HyracksDataException
{
//We do NOT join the spilled partitions here, that decision is made at
the descriptor level
//(which join technique to use)
inMemJoiner.completeJoin(writer);
+ }
+
+ public void closeAllSpilledPartitions(OptimizedHybridHashJoin.SIDE
whichSide) throws HyracksDataException {
+ for (int pid = partitionManager.getSpilledStatus().nextSetBit(0); pid
>= 0 && pid < numOfPartitions; pid =
+ partitionManager.getSpilledStatus().nextSetBit(pid + 1)) {
+ bufferManager.flushAndClearPartition(pid,
runFilesManager.getSpillWriterOrCreateIfNotExist(pid, whichSide));
+ runFilesManager.closeSpilledPartition(pid, whichSide);
+ }
}
public void releaseResource() throws HyracksDataException {
@@ -549,58 +471,15 @@
bufferManagerForHashTable = null;
}
- /**
- * In case of failure happens, we need to clear up the generated temporary
files.
- */
- public void clearProbeTempFiles() throws HyracksDataException {
- for (int i = 0; i < probeRFWriters.length; i++) {
- if (probeRFWriters[i] != null) {
- probeRFWriters[i].erase();
- }
- }
- }
-
- public RunFileReader getBuildRFReader(int pid) throws HyracksDataException
{
- return ((buildRFWriters[pid] == null) ? null :
(buildRFWriters[pid]).createDeleteOnCloseReader());
- }
-
- public int getBuildPartitionSizeInTup(int pid) {
- return (buildPSizeInTups[pid]);
- }
-
- public RunFileReader getProbeRFReader(int pid) throws HyracksDataException
{
- return ((probeRFWriters[pid] == null) ? null :
(probeRFWriters[pid]).createDeleteOnCloseReader());
- }
-
- public int getProbePartitionSizeInTup(int pid) {
- return (probePSizeInTups[pid]);
- }
-
- public int getMaxBuildPartitionSize() {
- int max = buildPSizeInTups[0];
- for (int i = 1; i < buildPSizeInTups.length; i++) {
- if (buildPSizeInTups[i] > max) {
- max = buildPSizeInTups[i];
- }
- }
- return max;
- }
-
- public int getMaxProbePartitionSize() {
- int max = probePSizeInTups[0];
- for (int i = 1; i < probePSizeInTups.length; i++) {
- if (probePSizeInTups[i] > max) {
- max = probePSizeInTups[i];
- }
- }
- return max;
- }
-
- public BitSet getPartitionStatus() {
- return spilledStatus;
- }
-
public void setIsReversed(boolean b) {
this.isReversed = b;
}
+
+ public HashJoinPartitionsManager getPartitionManager() {
+ return partitionManager;
+ }
+
+ public HashJoinRunFilesManager getRunFilesManager() {
+ return runFilesManager;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 2fd17da..daa3a35 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -305,7 +305,7 @@
if (state.hybridHJ != null) {
state.hybridHJ.closeBuild();
if (isFailed) {
- state.hybridHJ.clearBuildTempFiles();
+
state.hybridHJ.getRunFilesManager().clearBuildTempFiles();
} else {
ctx.setStateObject(state);
if (LOGGER.isTraceEnabled()) {
@@ -381,7 +381,6 @@
new TaskId(new ActivityId(getOperatorId(),
BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
- state.hybridHJ.initProbe();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("OptimizedHybridHashJoin is starting the
probe phase.");
@@ -404,8 +403,8 @@
if (failed) {
try {
// Clear temp files if fail() was called.
- state.hybridHJ.clearBuildTempFiles();
- state.hybridHJ.clearProbeTempFiles();
+
state.hybridHJ.getRunFilesManager().clearBuildTempFiles();
+
state.hybridHJ.getRunFilesManager().clearProbeTempFiles();
} finally {
writer.close(); // writer should always be closed.
}
@@ -418,12 +417,12 @@
} finally {
state.hybridHJ.releaseResource();
}
- BitSet partitionStatus =
state.hybridHJ.getPartitionStatus();
+ BitSet partitionStatus =
state.hybridHJ.getPartitionManager().getSpilledStatus();
rPartbuff.reset();
for (int pid = partitionStatus.nextSetBit(0); pid >=
0; pid =
partitionStatus.nextSetBit(pid + 1)) {
- RunFileReader bReader =
state.hybridHJ.getBuildRFReader(pid);
- RunFileReader pReader =
state.hybridHJ.getProbeRFReader(pid);
+ RunFileReader bReader =
state.hybridHJ.getRunFilesManager().getBuildRFReader(pid);
+ RunFileReader pReader =
state.hybridHJ.getRunFilesManager().getProbeRFReader(pid);
if (bReader == null || pReader == null) {
if (isLeftOuter && pReader != null) {
@@ -431,8 +430,8 @@
}
continue;
}
- int bSize =
state.hybridHJ.getBuildPartitionSizeInTup(pid);
- int pSize =
state.hybridHJ.getProbePartitionSizeInTup(pid);
+ int bSize =
state.hybridHJ.getPartitionManager().getBuildPartitionSizeInTup(pid);
+ int pSize =
state.hybridHJ.getPartitionManager().getProbePartitionSizeInTup(pid);
joinPartitionPair(bReader, pReader, bSize, pSize,
1);
}
} catch (Exception e) {
@@ -440,8 +439,8 @@
// to send the failure signal to the downstream, when
there is a throwable thrown.
writer.fail();
// Clear temp files as this.fail() nor this.close()
will no longer be called after close().
- state.hybridHJ.clearBuildTempFiles();
- state.hybridHJ.clearProbeTempFiles();
+
state.hybridHJ.getRunFilesManager().clearBuildTempFiles();
+
state.hybridHJ.getRunFilesManager().clearProbeTempFiles();
// Re-throw the whatever is caught.
throw e;
} finally {
@@ -587,7 +586,6 @@
probeSideReader.open();
rPartbuff.reset();
try {
- rHHj.initProbe();
while (probeSideReader.nextFrame(rPartbuff)) {
rHHj.probe(rPartbuff.getBuffer(), writer);
}
@@ -601,11 +599,11 @@
}
try {
- int maxAfterBuildSize =
rHHj.getMaxBuildPartitionSize();
- int maxAfterProbeSize =
rHHj.getMaxProbePartitionSize();
+ int maxAfterBuildSize =
rHHj.getPartitionManager().getMaxBuildPartitionSize();
+ int maxAfterProbeSize =
rHHj.getPartitionManager().getMaxProbePartitionSize();
int afterMax = Math.max(maxAfterBuildSize,
maxAfterProbeSize);
- BitSet rPStatus = rHHj.getPartitionStatus();
+ BitSet rPStatus =
rHHj.getPartitionManager().getSpilledStatus();
if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD *
beforeMax))) {
//Case 2.1.1 - Keep applying HHJ
if (LOGGER.isDebugEnabled()) {
@@ -613,10 +611,10 @@
+ "(isLeftOuter || build<probe) -
[Level " + level + "]");
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0;
rPid = rPStatus.nextSetBit(rPid + 1)) {
- RunFileReader rbrfw =
rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw =
rHHj.getProbeRFReader(rPid);
- int rbSizeInTuple =
rHHj.getBuildPartitionSizeInTup(rPid);
- int rpSizeInTuple =
rHHj.getProbePartitionSizeInTup(rPid);
+ RunFileReader rbrfw =
rHHj.getRunFilesManager().getBuildRFReader(rPid);
+ RunFileReader rprfw =
rHHj.getRunFilesManager().getProbeRFReader(rPid);
+ int rbSizeInTuple =
rHHj.getPartitionManager().getBuildPartitionSizeInTup(rPid);
+ int rpSizeInTuple =
rHHj.getPartitionManager().getProbePartitionSizeInTup(rPid);
if (rbrfw == null || rprfw == null) {
if (isLeftOuter && rprfw != null) {
@@ -639,8 +637,8 @@
+ "(isLeftOuter || build<probe) -
[Level " + level + "]");
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0;
rPid = rPStatus.nextSetBit(rPid + 1)) {
- RunFileReader rbrfw =
rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw =
rHHj.getProbeRFReader(rPid);
+ RunFileReader rbrfw =
rHHj.getRunFilesManager().getBuildRFReader(rPid);
+ RunFileReader rprfw =
rHHj.getRunFilesManager().getProbeRFReader(rPid);
if (rbrfw == null || rprfw == null) {
if (isLeftOuter && rprfw != null) {
@@ -650,8 +648,8 @@
continue;
}
- int buildSideInTups =
rHHj.getBuildPartitionSizeInTup(rPid);
- int probeSideInTups =
rHHj.getProbePartitionSizeInTup(rPid);
+ int buildSideInTups =
rHHj.getPartitionManager().getBuildPartitionSizeInTup(rPid);
+ int probeSideInTups =
rHHj.getPartitionManager().getProbePartitionSizeInTup(rPid);
// NLJ order is outer + inner, the order is
reversed from the other joins
if (isLeftOuter || probeSideInTups <
buildSideInTups) {
//checked-modified
@@ -665,8 +663,8 @@
} catch (Exception e) {
// Make sure that temporary run files generated in
recursive hybrid hash joins
// are closed and deleted.
- rHHj.clearBuildTempFiles();
- rHHj.clearProbeTempFiles();
+ rHHj.getRunFilesManager().clearBuildTempFiles();
+ rHHj.getRunFilesManager().clearProbeTempFiles();
throw e;
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/3421
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I3f6d011f8af256b290cc28a04a412bcbd005920a
Gerrit-Change-Number: 3421
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <[email protected]>