Shiva Jahangiri has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/3422
Change subject: Step1-Making partitionManager for hash join ...................................................................... Step1-Making partitionManager for hash join Change-Id: I7a5609f6709efa427769da33c3a3b93233984fd8 --- A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionManager.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java 2 files changed, 69 insertions(+), 25 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/22/3422/1 diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionManager.java new file mode 100644 index 0000000..59d90d6 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionManager.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.join; + +import java.util.BitSet; + +public class HashJoinPartitionManager { + + private final BitSet spilledStatus; //0=resident, 1=spilled + + public HashJoinPartitionManager(int numOfPartitions) { + this.spilledStatus = new BitSet(numOfPartitions); + } + + public BitSet getSpilledStatus() { + return spilledStatus; + } + + public void setSpilledStatus(int pid) { + if (pid >= 0 && pid <= spilledStatus.length()) { + spilledStatus.set(pid); + } + } + + public void clearSpilledStatus() { + spilledStatus.clear(); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java index c78e0dc..e50b19a 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java @@ -56,6 +56,7 @@ // Used for special probe BigObject which can not be held into the Join memory private FrameTupleAppender bigProbeFrameAppender; + private HashJoinPartitionManager partitionManager; public enum SIDE { BUILD, @@ -81,7 +82,6 @@ private final boolean isLeftOuter; private final IMissingWriter[] nonMatchWriters; - private final BitSet spilledStatus; //0=resident, 1=spilled private final int numOfPartitions; private final int memSizeInFrames; private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions @@ -108,6 +108,7 @@ String probeRelName, String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd, RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) { + this.partitionManager = new HashJoinPartitionManager(numOfPartitions); this.ctx = ctx; this.memSizeInFrames = memSizeInFrames; this.buildRd = buildRd; @@ -129,8 +130,6 @@ this.isLeftOuter = isLeftOuter; this.isReversed = false; - this.spilledStatus = new BitSet(numOfPartitions); - this.nonMatchWriters = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null; if (isLeftOuter) { for (int i = 0; i < nullWriterFactories1.length; i++) { @@ -142,11 +141,11 @@ public void initBuild() throws HyracksDataException { framePool = new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize()); bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool); - bufferManager = new VPartitionTupleBufferManager( - PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus), - numOfPartitions, framePool); - spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus); - spilledStatus.clear(); + bufferManager = new VPartitionTupleBufferManager(PreferToSpillFullyOccupiedFramePolicy + .createAtMostOneFrameForSpilledPartitionConstrain(partitionManager.getSpilledStatus()), numOfPartitions, + framePool); + spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, partitionManager.getSpilledStatus()); + partitionManager.clearSpilledStatus(); buildPSizeInTups = new int[numOfPartitions]; } @@ -181,7 +180,7 @@ RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD); bufferManager.flushPartition(pid, writer); bufferManager.clearPartition(pid); - spilledStatus.set(pid); + partitionManager.setSpilledStatus(pid); } private void closeBuildPartition(int pid) throws HyracksDataException { @@ -250,8 +249,8 @@ break; } try { - for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid = - spilledStatus.nextSetBit(pid + 1)) { + for (int pid = partitionManager.getSpilledStatus().nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid = + partitionManager.getSpilledStatus().nextSetBit(pid + 1)) { if (bufferManager.getNumTuples(pid) > 0) { bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide)); bufferManager.clearPartition(pid); @@ -280,12 +279,12 @@ private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException { // we need number of |spilledPartitions| buffers to store the probe data int frameSize = ctx.getInitialFrameSize(); - long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize; + long freeSpace = (long) (memSizeInFrames - partitionManager.getSpilledStatus().cardinality()) * frameSize; // For partitions in main memory, we deduct their size from the free space. int inMemTupCount = 0; - for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p = - spilledStatus.nextClearBit(p + 1)) { + for (int p = partitionManager.getSpilledStatus().nextClearBit(0); p >= 0 && p < numOfPartitions; p = + partitionManager.getSpilledStatus().nextClearBit(p + 1)) { freeSpace -= bufferManager.getPhysicalSize(p); inMemTupCount += buildPSizeInTups[p]; } @@ -318,8 +317,8 @@ } else { // There is no single suitable partition. So, we need to spill multiple partitions to the disk // in order to accommodate the hash table. - for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p = - spilledStatus.nextClearBit(p + 1)) { + for (int p = partitionManager.getSpilledStatus().nextClearBit(0); p >= 0 && p < numOfPartitions; p = + partitionManager.getSpilledStatus().nextClearBit(p + 1)) { int spaceToBeReturned = bufferManager.getPhysicalSize(p); int numberOfTuplesToBeSpilled = buildPSizeInTups[p]; if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 0) { @@ -377,8 +376,8 @@ long minSpaceAfterSpill = (long) memSizeInFrames * frameSize; int minSpaceAfterSpillPartID = -1; - for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p = - spilledStatus.nextClearBit(p + 1)) { + for (int p = partitionManager.getSpilledStatus().nextClearBit(0); p >= 0 && p < numOfPartitions; p = + partitionManager.getSpilledStatus().nextClearBit(p + 1)) { if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) == 0) { continue; } @@ -398,8 +397,8 @@ } private int selectPartitionsToReload(long freeSpace, int pid, int inMemTupCount) { - for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < numOfPartitions; i = - spilledStatus.nextSetBit(i + 1)) { + for (int i = partitionManager.getSpilledStatus().nextSetBit(pid); i >= 0 && i < numOfPartitions; i = + partitionManager.getSpilledStatus().nextSetBit(i + 1)) { int spilledTupleCount = buildPSizeInTups[i]; // Expected hash table size increase after reloading this partition long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange( @@ -435,7 +434,7 @@ } finally { r.close(); } - spilledStatus.set(pid, false); + partitionManager.getSpilledStatus().set(pid, false); buildRFWriters[pid] = null; return true; } @@ -450,7 +449,7 @@ private void loadDataInMemJoin() throws HyracksDataException { for (int pid = 0; pid < numOfPartitions; pid++) { - if (!spilledStatus.get(pid)) { + if (!partitionManager.getSpilledStatus().get(pid)) { bufferManager.flushPartition(pid, new IFrameWriter() { @Override public void open() throws HyracksDataException { @@ -496,7 +495,7 @@ int pid = probeHpc.partition(accessorProbe, i, numOfPartitions); if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase - if (spilledStatus.get(pid)) { //pid is Spilled + if (partitionManager.getSpilledStatus().get(pid)) { //pid is Spilled while (!bufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) { int victim = pid; if (bufferManager.getNumTuples(pid) == 0) { // current pid is empty, choose the biggest one @@ -531,7 +530,7 @@ } private boolean isBuildRelAllInMemory() { - return spilledStatus.nextSetBit(0) < 0; + return partitionManager.getSpilledStatus().nextSetBit(0) < 0; } public void completeProbe(IFrameWriter writer) throws HyracksDataException { @@ -597,10 +596,11 @@ } public BitSet getPartitionStatus() { - return spilledStatus; + return partitionManager.getSpilledStatus(); } public void setIsReversed(boolean b) { this.isReversed = b; } + } -- To view, visit https://asterix-gerrit.ics.uci.edu/3422 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: I7a5609f6709efa427769da33c3a3b93233984fd8 Gerrit-Change-Number: 3422 Gerrit-PatchSet: 1 Gerrit-Owner: Shiva Jahangiri <[email protected]>
