Repository: asterixdb Updated Branches: refs/heads/master 5714bfa8b -> aac1e936e
[NO ISSUE][TXN] Avoid boxing object creation in ConcurrentLockManager Change-Id: Iae5975eb1fde93eae1836c16e5329683dbed9bcc Reviewed-on: https://asterix-gerrit.ics.uci.edu/2582 Reviewed-by: Murtadha Hubail <mhub...@apache.org> Tested-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/aac1e936 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/aac1e936 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/aac1e936 Branch: refs/heads/master Commit: aac1e936e12f06ca4050a200325f25410a5c1b4b Parents: 5714bfa Author: Michael Blow <mb...@apache.org> Authored: Fri Apr 13 11:12:35 2018 -0400 Committer: Michael Blow <mb...@apache.org> Committed: Fri Apr 13 08:52:36 2018 -0700 ---------------------------------------------------------------------- .../src/main/resources/ArenaManager.java | 2 +- asterixdb/asterix-transactions/pom.xml | 4 + .../service/locking/ConcurrentLockManager.java | 117 ++-- .../service/locking/DumpTablePrinter.java | 6 +- .../service/locking/PrimitiveIntHashMap.java | 595 ------------------- asterixdb/pom.xml | 5 + 6 files changed, 75 insertions(+), 654 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aac1e936/asterixdb/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java b/asterixdb/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java index be295d6..1cfb804 100644 --- a/asterixdb/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java +++ b/asterixdb/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java @@ -56,7 +56,7 @@ public class @E@ArenaManager { public long allocate() { final LocalManager localManager = local.get(); final @E@RecordManager recMgr = localManager.mgr; - final int allocId = TRACK_ALLOC_ID ? (++recMgr.allocCounter % 0x7fff) : 0; + final int allocId = TRACK_ALLOC_ID ? (++recMgr.allocCounter % 0x7ffe + 1) : 1; final int localId = recMgr.allocate(); long result = TypeUtil.Global.build(localManager.arenaId, allocId, localId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aac1e936/asterixdb/asterix-transactions/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml index 07dcf5d..e6e522e 100644 --- a/asterixdb/asterix-transactions/pom.xml +++ b/asterixdb/asterix-transactions/pom.xml @@ -163,5 +163,9 @@ <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </dependency> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aac1e936/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java index c91d233..b26342e 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java @@ -22,7 +22,6 @@ package org.apache.asterix.transaction.management.service.locking; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.asterix.common.exceptions.ACIDException; @@ -36,30 +35,37 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import it.unimi.dsi.fastutil.longs.Long2LongMap; +import it.unimi.dsi.fastutil.longs.Long2LongMaps; +import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongList; + /** * A concurrent implementation of the ILockManager interface. * * @see ResourceGroupTable * @see ResourceGroup */ +@SuppressWarnings("squid:RedundantThrowsDeclarationCheck") // throws ACIDException public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent { static final Logger LOGGER = LogManager.getLogger(); static final Level LVL = Level.TRACE; - public static final boolean ENABLED_DEADLOCK_FREE_LOCKING_PROTOCOL = true; + private static final boolean ENABLED_DEADLOCK_FREE_LOCKING_PROTOCOL = true; - public static final int NIL = -1; - public static final long NILL = -1L; + private static final int NIL = -1; + private static final long NILL = -1L; - public static final boolean DEBUG_MODE = false;//true - public static final boolean CHECK_CONSISTENCY = false; + private static final boolean DEBUG_MODE = false;//true + private static final boolean CHECK_CONSISTENCY = false; - private ResourceGroupTable table; - private ResourceArenaManager resArenaMgr; - private RequestArenaManager reqArenaMgr; - private JobArenaManager jobArenaMgr; - private ConcurrentHashMap<Long, Long> txnId2TxnSlotMap; - private LockManagerStats stats = new LockManagerStats(10000); + private final ResourceGroupTable table; + private final ResourceArenaManager resArenaMgr; + private final RequestArenaManager reqArenaMgr; + private final JobArenaManager jobArenaMgr; + private final Long2LongMap txnId2TxnSlotMap; + private final LockManagerStats stats = new LockManagerStats(10000); enum LockAction { ERR(false, false), @@ -77,7 +83,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } } - static LockAction[][] ACTION_MATRIX = { + private static final LockAction[][] ACTION_MATRIX = { // new NL IS IX S X { LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL { LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS @@ -97,7 +103,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer); reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer); jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer); - txnId2TxnSlotMap = new ConcurrentHashMap<>(); + txnId2TxnSlotMap = Long2LongMaps.synchronize(new Long2LongOpenHashMap()); } @Override @@ -129,10 +135,10 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext); break; } - //no break + //fall-through case UPD: resArenaMgr.setMaxMode(resSlot, lockMode); - // no break + //fall-through case GET: addHolder(reqSlot, resSlot, jobSlot); locked = true; @@ -185,31 +191,35 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent void pop(); } - static class NOPTracker implements DeadlockTracker { + private static class NOPTracker implements DeadlockTracker { static final DeadlockTracker INSTANCE = new NOPTracker(); @Override public void pushResource(long resSlot) { + // no-op } @Override public void pushRequest(long reqSlot) { + // no-op } @Override public void pushJob(long jobSlot) { + // no-op } @Override public void pop() { + // no-op } } - static class CollectingTracker implements DeadlockTracker { + private static class CollectingTracker implements DeadlockTracker { static final boolean DEBUG = false; - ArrayList<Long> slots = new ArrayList<>(); + LongList slots = new LongArrayList(); ArrayList<String> types = new ArrayList<>(); @Override @@ -217,7 +227,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent types.add("Resource"); slots.add(resSlot); if (DEBUG) { - System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); + LOGGER.info("push " + types.get(types.size() - 1) + " " + slots.getLong(slots.size() - 1)); } } @@ -226,7 +236,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent types.add("Request"); slots.add(reqSlot); if (DEBUG) { - System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); + LOGGER.info("push " + types.get(types.size() - 1) + " " + slots.getLong(slots.size() - 1)); } } @@ -235,24 +245,24 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent types.add("Job"); slots.add(jobSlot); if (DEBUG) { - System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); + LOGGER.info("push " + types.get(types.size() - 1) + " " + slots.getLong(slots.size() - 1)); } } @Override public void pop() { if (DEBUG) { - System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1)); + LOGGER.info("pop " + types.get(types.size() - 1) + " " + slots.getLong(slots.size() - 1)); } types.remove(types.size() - 1); - slots.remove(slots.size() - 1); + slots.removeLong(slots.size() - 1); } @Override public String toString() { StringBuilder sb = new StringBuilder(); for (int i = 0; i < slots.size(); ++i) { - sb.append(types.get(i) + " " + TypeUtil.Global.toString(slots.get(i)) + "\n"); + sb.append(types.get(i)).append(" ").append(TypeUtil.Global.toString(slots.getLong(i))).append("\n"); } return sb.toString(); } @@ -270,15 +280,11 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent * @return true if a cycle would be introduced, false otherwise */ private boolean introducesDeadlock(final long resSlot, final long jobSlot, final DeadlockTracker tracker) { - if (ENABLED_DEADLOCK_FREE_LOCKING_PROTOCOL) { - /** - * Due to the deadlock-free locking protocol, deadlock is not possible. - * So, this method always returns false. - */ - return false; - } else { - return introducesDeadlock(resSlot, jobSlot, tracker, 0); - } + /* + * Due to the deadlock-free locking protocol, deadlock is not possible. + * So, this method always returns false in that case + */ + return !ENABLED_DEADLOCK_FREE_LOCKING_PROTOCOL && introducesDeadlock(resSlot, jobSlot, tracker, 0); } private boolean introducesDeadlock(final long resSlot, final long jobSlot, final DeadlockTracker tracker, @@ -298,20 +304,20 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent // The scanWaiters flag indicates if we are currently scanning the waiters (true) or the upgraders // (false). boolean scanWaiters = true; - long waiter = jobArenaMgr.getLastWaiter(holderJobSlot); - if (waiter < 0 && scanWaiters) { + long jobWaiter = jobArenaMgr.getLastWaiter(holderJobSlot); + if (jobWaiter < 0) { scanWaiters = false; - waiter = jobArenaMgr.getLastUpgrader(holderJobSlot); + jobWaiter = jobArenaMgr.getLastUpgrader(holderJobSlot); } - while (waiter >= 0) { - long waitingOnResSlot = reqArenaMgr.getResourceId(waiter); + while (jobWaiter >= 0) { + long waitingOnResSlot = reqArenaMgr.getResourceId(jobWaiter); if (introducesDeadlock(waitingOnResSlot, jobSlot, tracker, depth + 1)) { return true; } - waiter = reqArenaMgr.getNextJobRequest(waiter); - if (waiter < 0 && scanWaiters) { + jobWaiter = reqArenaMgr.getNextJobRequest(jobWaiter); + if (jobWaiter < 0 && scanWaiters) { scanWaiters = false; - waiter = jobArenaMgr.getLastUpgrader(holderJobSlot); + jobWaiter = jobArenaMgr.getLastUpgrader(holderJobSlot); } } @@ -407,7 +413,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent switch (act) { case UPD: resArenaMgr.setMaxMode(resSlot, lockMode); - // no break + //fall-through case GET: addHolder(reqSlot, resSlot, jobSlot); return true; @@ -532,8 +538,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent stats.releaseLocks(); long txnId = txnContext.getTxnId().getId(); - Long jobSlot = txnId2TxnSlotMap.get(txnId); - if (jobSlot == null) { + long jobSlot = txnId2TxnSlotMap.get(txnId); + if (jobSlot == 0) { // we don't know the job, so there are no locks for it - we're done return; } @@ -565,15 +571,15 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } private long findOrAllocJobSlot(long txnId) { - Long jobSlot = txnId2TxnSlotMap.get(txnId); - if (jobSlot == null) { - jobSlot = new Long(jobArenaMgr.allocate()); + long jobSlot = txnId2TxnSlotMap.get(txnId); + if (jobSlot == 0) { + jobSlot = jobArenaMgr.allocate(); if (DEBUG_MODE) { LOGGER.trace("new job slot " + TypeUtil.Global.toString(jobSlot) + " (" + txnId + ")"); } jobArenaMgr.setTxnId(jobSlot, txnId); - Long oldSlot = txnId2TxnSlotMap.putIfAbsent(txnId, jobSlot); - if (oldSlot != null) { + long oldSlot = txnId2TxnSlotMap.putIfAbsent(txnId, jobSlot); + if (oldSlot != 0) { // if another thread allocated a slot for this jobThreadId between // get(..) and putIfAbsent(..), we'll use that slot and // deallocate the one we allocated @@ -584,7 +590,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent jobSlot = oldSlot; } } - assert (jobSlot >= 0); + assert jobSlot > 0; return jobSlot; } @@ -754,7 +760,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent void remove(long request, long resource, long job); } - final Queue waiter = new Queue() { + private final Queue waiter = new Queue() { @Override public void add(long request, long resource, long job) { long waiter = resArenaMgr.getFirstWaiter(resource); @@ -788,7 +794,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } }; - final Queue upgrader = new Queue() { + private final Queue upgrader = new Queue() { @Override public void add(long request, long resource, long job) { long upgrader = resArenaMgr.getFirstUpgrader(resource); @@ -987,6 +993,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IllegalStateException("interrupted", e); } } @@ -1015,8 +1022,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent * @return the slot of the request, if the lock request is found, NILL otherwise */ private long findLockInJobQueue(final int dsId, final int entityHashValue, final long txnId, byte lockMode) { - Long jobSlot = txnId2TxnSlotMap.get(txnId); - if (jobSlot == null) { + long jobSlot = txnId2TxnSlotMap.get(txnId); + if (jobSlot == 0) { return NILL; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aac1e936/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java index 26261c2..6a5372e 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java @@ -19,17 +19,17 @@ package org.apache.asterix.transaction.management.service.locking; -import java.util.concurrent.ConcurrentHashMap; +import it.unimi.dsi.fastutil.longs.Long2LongMap; public class DumpTablePrinter implements TablePrinter { private ResourceGroupTable table; private ResourceArenaManager resArenaMgr; private RequestArenaManager reqArenaMgr; private JobArenaManager jobArenaMgr; - private ConcurrentHashMap<Long, Long> txnIdToJobSlotMap; + private Long2LongMap txnIdToJobSlotMap; DumpTablePrinter(ResourceGroupTable table, ResourceArenaManager resArenaMgr, RequestArenaManager reqArenaMgr, - JobArenaManager jobArenaMgr, ConcurrentHashMap<Long, Long> txnIdToJobSlotMap) { + JobArenaManager jobArenaMgr, Long2LongMap txnIdToJobSlotMap) { this.table = table; this.resArenaMgr = resArenaMgr; this.reqArenaMgr = reqArenaMgr; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aac1e936/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java deleted file mode 100644 index 5bf5ad6..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java +++ /dev/null @@ -1,595 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.transaction.management.service.locking; - -import java.util.ArrayList; - -/** - * PrimitiveIntHashMap supports primitive int type as key and value. - * The hash map grows when the available slots in a bucket are overflowed. - * Also, the hash map shrinks according to the following shrink policy. - * : Shrink when the resource under-utilization lasts for a certain threshold time. - * - * @author kisskys - */ -public class PrimitiveIntHashMap { - private final int CHILD_BUCKETS; //INIT_NUM_OF_BUCKETS; - private final int NUM_OF_SLOTS; //NUM_OF_SLOTS_IN_A_BUCKET; - private final int SHRINK_TIMER_THRESHOLD; - - private int occupiedSlots; - private ArrayList<ChildIntArrayManager> pArray; //parent array - private int hashMod; - private long shrinkTimer; - private boolean isShrinkTimerOn; - private int iterBucketIndex; - private int iterSlotIndex; - private int iterChildIndex; - private KeyValuePair iterPair; - - // //////////////////////////////////////////////// - // // begin of unit test - // //////////////////////////////////////////////// - // - // /** - // * @param args - // */ - // public static void main(String[] args) { - // int i, j; - // int k = 0; - // int num = 5; - // int key[] = new int[500]; - // int val[] = new int[500]; - // KeyValuePair pair; - // PrimitiveIntHashMap map = new PrimitiveIntHashMap(1<<4, 1<<3, 5); - // - // for (j=0; j < num; j++) { - // - // k += 100; - // //generate data - // for (i=0; i < k; i++) { - // key[i] = i; - // val[i] = i; - // } - // - // //put data to map - // for (i=0; i < k-30; i++) { - // map.put(key[i], val[i]); - // } - // - // //put data to map - // for (i=0; i < k-30; i++) { - // map.put(key[i], val[i]); - // } - // - // map.beginIterate(); - // pair = map.getNextKeyValue(); - // i = 0; - // while (pair != null) { - // i++; - // System.out.println("["+i+"] key:"+ pair.key + ", val:"+ pair.value); - // pair = map.getNextKeyValue(); - // } - // - // //System.out.println(map.prettyPrint()); - // - // for (i=k-20; i< k; i++) { //skip X70~X79 - // map.put(key[i], val[i]); - // } - // - // System.out.println(map.prettyPrint()); - // - // //remove data to map - // for (i=0; i < k-10; i++) { - // map.remove(key[i]); - // try { - // Thread.currentThread().sleep(1); - // } catch (InterruptedException e) { - // e.printStackTrace(); - // } - // } - // - // map.beginIterate(); - // pair = map.getNextKeyValue(); - // i = 0; - // while (pair != null) { - // i++; - // System.out.println("["+i+"] key:"+ pair.key + ", val:"+ pair.value); - // pair = map.getNextKeyValue(); - // } - // - // //remove data to map - // for (i=0; i < k-10; i++) { - // map.remove(key[i]); - // try { - // Thread.currentThread().sleep(1); - // } catch (InterruptedException e) { - // // TODO Auto-generated catch block - // e.printStackTrace(); - // } - // } - // - // System.out.println(map.prettyPrint()); - // - // //get data from map - // for (i=0; i < k; i++) { - // System.out.println(""+i+"=> key:"+ key[i] + ", val:"+val[i] +", result: " + map.get(key[i])); - // } - // } - // - // map.beginIterate(); - // pair = map.getNextKeyValue(); - // i = 0; - // while (pair != null) { - // i++; - // System.out.println("["+i+"] key:"+ pair.key + ", val:"+ pair.value); - // pair = map.getNextKeyValue(); - // } - // } - // - // //////////////////////////////////////////////// - // // end of unit test - // //////////////////////////////////////////////// - - public PrimitiveIntHashMap() { - CHILD_BUCKETS = 1 << 9; //INIT_NUM_OF_BUCKETS; - NUM_OF_SLOTS = 1 << 3; //NUM_OF_SLOTS_IN_A_BUCKET; - SHRINK_TIMER_THRESHOLD = 120000; //2min - pArray = new ArrayList<ChildIntArrayManager>(); - pArray.add(new ChildIntArrayManager(this)); - hashMod = CHILD_BUCKETS; - occupiedSlots = 0; - iterPair = new KeyValuePair(); - } - - public PrimitiveIntHashMap(int childBuckets, int numOfSlots, int shrinkTimerThreshold) { - CHILD_BUCKETS = childBuckets; - NUM_OF_SLOTS = numOfSlots; - SHRINK_TIMER_THRESHOLD = shrinkTimerThreshold; - pArray = new ArrayList<ChildIntArrayManager>(); - pArray.add(new ChildIntArrayManager(this)); - hashMod = CHILD_BUCKETS; - occupiedSlots = 0; - iterPair = new KeyValuePair(); - } - - public void put(int key, int value) { - int growCount = 0; - int bucketNum = hash(key); - ChildIntArrayManager child = pArray.get(bucketNum / CHILD_BUCKETS); - while (child.isFull(bucketNum % CHILD_BUCKETS)) { - growHashMap(); - bucketNum = hash(key); - child = pArray.get(bucketNum / CHILD_BUCKETS); - if (growCount > 2) { - //changeHashFunc(); - } - growCount++; - } - occupiedSlots += child.put(bucketNum % CHILD_BUCKETS, key, value, false); - } - - public void upsert(int key, int value) { - int growCount = 0; - int bucketNum = hash(key); - ChildIntArrayManager child = pArray.get(bucketNum / CHILD_BUCKETS); - while (child.isFull(bucketNum % CHILD_BUCKETS)) { - growHashMap(); - bucketNum = hash(key); - child = pArray.get(bucketNum / CHILD_BUCKETS); - if (growCount > 2) { - //changeHashFunc(); - } - growCount++; - } - occupiedSlots += child.put(bucketNum % CHILD_BUCKETS, key, value, true); - } - - private int hash(int key) { - return key % hashMod; - } - - private void growHashMap() { - int size = pArray.size(); - int i; - - //grow buckets by adding more child - for (i = 0; i < size; i++) { - pArray.add(new ChildIntArrayManager(this)); - } - - //increase hashMod - hashMod *= 2; - - //re-hash - rehash(0, size, hashMod / 2); - } - - private void shrinkHashMap() { - int size = pArray.size(); - int i; - - //decrease hashMod - hashMod /= 2; - - //re-hash - rehash(size / 2, size, hashMod * 2); - - //shrink buckets by removing child(s) - for (i = size - 1; i >= size / 2; i--) { - pArray.remove(i); - } - } - - private void rehash(int begin, int end, int oldHashMod) { - int i, j, k; - int key, value; - ChildIntArrayManager child; - - //re-hash - for (i = begin; i < end; i++) { - child = pArray.get(i); - for (j = 0; j < CHILD_BUCKETS; j++) { - if (child.cArray[j][0] == 0) { - continue; - } - for (k = 1; k < NUM_OF_SLOTS; k++) { - //if the hashValue of the key is different, then re-hash it. - key = child.cArray[j][k * 2]; - if (hash(key) != key % oldHashMod) { - value = child.cArray[j][k * 2 + 1]; - //remove existing key and value - //Notice! To avoid bucket iteration, child.remove() is not used. - child.cArray[j][k * 2] = -1; - child.cArray[j][0]--; - //re-hash it - pArray.get(hash(key) / CHILD_BUCKETS).put(hash(key) % CHILD_BUCKETS, key, value, false); - } - } - } - } - } - - // private void changeHashFunc() { - // //TODO need to implement. - // throw new UnsupportedOperationException("changeHashFunc() not implemented"); - // } - - public int get(int key) { - int bucketNum = hash(key); - return pArray.get(bucketNum / CHILD_BUCKETS).get(bucketNum % CHILD_BUCKETS, key); - } - - public void remove(int key) { - int bucketNum = hash(key); - occupiedSlots -= pArray.get(bucketNum / CHILD_BUCKETS).remove(bucketNum % CHILD_BUCKETS, key); - - if (needShrink()) { - shrinkHashMap(); - } - } - - /** - * Shrink policy: - * Shrink when the resource under-utilization lasts for a certain amount of time. - * - * @return - */ - private boolean needShrink() { - int size = pArray.size(); - int usedSlots = occupiedSlots; - if (usedSlots == 0) { - usedSlots = 1; - } - if (size > 1 && size * CHILD_BUCKETS * NUM_OF_SLOTS / usedSlots >= 3 && isSafeToShrink()) { - if (isShrinkTimerOn) { - if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) { - isShrinkTimerOn = false; - return true; - } - } else { - //turn on timer - isShrinkTimerOn = true; - shrinkTimer = System.currentTimeMillis(); - } - } else { - //turn off timer - isShrinkTimerOn = false; - } - return false; - } - - private boolean isSafeToShrink() { - int i, j; - int size = pArray.size(); - //Child: 0, 1, 2, 3, 4, 5, 6, 7 - //[HChild(Head Child):0 and TChild(Tail Child): 4], [1(H),5(T)], [2(H),6(T)] and so on. - //When the map shrinks, the sum of occupied slots in H/TChild should not exceed the NUM_OF_SLOTS-1. - //Then it is safe to shrink. Otherwise, unsafe. - ChildIntArrayManager HChild, TChild; - - for (i = 0; i < size / 2; i++) { - HChild = pArray.get(i); - TChild = pArray.get(size / 2 + i); - for (j = 0; j < CHILD_BUCKETS; j++) { - if (HChild.cArray[j][0] + TChild.cArray[j][0] > NUM_OF_SLOTS - 1) { - return false; - } - } - } - return true; - } - - public String prettyPrint() { - StringBuilder s = new StringBuilder("\n########### PrimitiveIntHashMap Status #############\n"); - ChildIntArrayManager child; - int i, j, k; - int size = pArray.size(); - for (i = 0; i < size; i++) { - child = pArray.get(i); - s.append("child[").append(i).append("]\n"); - for (j = 0; j < CHILD_BUCKETS; j++) { - s.append(j).append(" "); - for (k = 0; k < NUM_OF_SLOTS; k++) { - s.append("[").append(child.cArray[j][k * 2]).append(",").append(child.cArray[j][k * 2 + 1]) - .append("] "); - } - s.append("\n"); - } - } - return s.toString(); - } - - public int getNumOfSlots() { - return NUM_OF_SLOTS; - } - - public int getNumOfChildBuckets() { - return CHILD_BUCKETS; - } - - public void clear(boolean needShrink) { - int size = pArray.size(); - for (int i = size - 1; i >= 0; i--) { - if (needShrink && i != 0) { - pArray.remove(i); - } else { - pArray.get(i).clear(); - } - } - occupiedSlots = 0; - } - - /////////////////////////////////////// - // iterate method - /////////////////////////////////////// - - public void beginIterate() { - iterChildIndex = 0; - iterBucketIndex = 0; - iterSlotIndex = 1; - } - - public KeyValuePair getNextKeyValue() { - for (; iterChildIndex < pArray.size(); iterChildIndex++, iterBucketIndex = 0) { - for (; iterBucketIndex < CHILD_BUCKETS; iterBucketIndex++, iterSlotIndex = 1) { - if (iterSlotIndex == 1 && pArray.get(iterChildIndex).cArray[iterBucketIndex][0] == 0) { - continue; - } - for (; iterSlotIndex < NUM_OF_SLOTS; iterSlotIndex++) { - iterPair.key = pArray.get(iterChildIndex).cArray[iterBucketIndex][iterSlotIndex * 2]; - if (iterPair.key == -1) { - continue; - } - iterPair.value = pArray.get(iterChildIndex).cArray[iterBucketIndex][iterSlotIndex * 2 + 1]; - iterSlotIndex++; - return iterPair; - } - } - } - return null; - } - - public int getNextKey() { - for (; iterChildIndex < pArray.size(); iterChildIndex++, iterBucketIndex = 0) { - for (; iterBucketIndex < CHILD_BUCKETS; iterBucketIndex++, iterSlotIndex = 1) { - if (iterSlotIndex == 1 && pArray.get(iterChildIndex).cArray[iterBucketIndex][0] == 0) { - continue; - } - for (; iterSlotIndex < NUM_OF_SLOTS; iterSlotIndex++) { - iterPair.key = pArray.get(iterChildIndex).cArray[iterBucketIndex][iterSlotIndex * 2]; - if (iterPair.key == -1) { - continue; - } - iterSlotIndex++; - return iterPair.key; - } - } - } - return -1; - } - - public int getNextValue() { - for (; iterChildIndex < pArray.size(); iterChildIndex++, iterBucketIndex = 0) { - for (; iterBucketIndex < CHILD_BUCKETS; iterBucketIndex++, iterSlotIndex = 1) { - if (iterSlotIndex == 1 && pArray.get(iterChildIndex).cArray[iterBucketIndex][0] == 0) { - continue; - } - for (; iterSlotIndex < NUM_OF_SLOTS; iterSlotIndex++) { - iterPair.key = pArray.get(iterChildIndex).cArray[iterBucketIndex][iterSlotIndex * 2]; - if (iterPair.key == -1) { - continue; - } - iterPair.value = pArray.get(iterChildIndex).cArray[iterBucketIndex][iterSlotIndex * 2 + 1]; - iterSlotIndex++; - return iterPair.value; - } - } - } - return -1; - } - - public static class KeyValuePair { - public int key; - public int value; - } -} - -class ChildIntArrayManager { - private final int DIM1_SIZE; - private final int DIM2_SIZE; - private final int NUM_OF_SLOTS; - public int[][] cArray; //child array - - public ChildIntArrayManager(PrimitiveIntHashMap parentHashMap) { - DIM1_SIZE = parentHashMap.getNumOfChildBuckets(); - DIM2_SIZE = parentHashMap.getNumOfSlots() * 2; //2: Array of [key, value] pair - NUM_OF_SLOTS = parentHashMap.getNumOfSlots(); - initialize(); - } - - private void initialize() { - cArray = new int[DIM1_SIZE][DIM2_SIZE]; - int i, j; - for (i = 0; i < DIM1_SIZE; i++) { - //cArray[i][0] is used as a counter to count how many slots are used in this bucket. - //cArray[i][1] is not used. - cArray[i][0] = 0; - for (j = 1; j < NUM_OF_SLOTS; j++) { - cArray[i][j * 2] = -1; // -1 represent that the slot is empty - } - } - } - - public void clear() { - int i, j; - for (i = 0; i < DIM1_SIZE; i++) { - //cArray[i][0] is used as a counter to count how many slots are used in this bucket. - //cArray[i][1] is not used. - if (cArray[i][0] == 0) { - continue; - } - cArray[i][0] = 0; - for (j = 1; j < NUM_OF_SLOTS; j++) { - cArray[i][j * 2] = -1; // -1 represent that the slot is empty - } - } - } - - public void deinitialize() { - cArray = null; - } - - public void allocate() { - initialize(); - } - - public boolean isFull(int bucketNum) { - return cArray[bucketNum][0] == NUM_OF_SLOTS - 1; - } - - public boolean isEmpty(int bucketNum) { - return cArray[bucketNum][0] == 0; - } - - /** - * Put key,value into a slot in the bucket if the key doesn't exist. - * Update value if the key exists and if isUpsert is true - * No need to call get() to check the existence of the key before calling put(). - * Notice! Caller should make sure that there is an available slot. - * - * @param bucketNum - * @param key - * @param value - * @param isUpsert - * @return 1 for new insertion, 0 for key duplication - */ - public int put(int bucketNum, int key, int value, boolean isUpsert) { - int i; - int emptySlot = -1; - - if (cArray[bucketNum][0] == 0) { - cArray[bucketNum][2] = key; - cArray[bucketNum][3] = value; - cArray[bucketNum][0]++; - return 1; - } - - for (i = 1; i < NUM_OF_SLOTS; i++) { - if (cArray[bucketNum][i * 2] == key) { - if (isUpsert) { - cArray[bucketNum][i * 2 + 1] = value; - } - return 0; - } else if (cArray[bucketNum][i * 2] == -1) { - emptySlot = i; - } - } - - if (emptySlot == -1) { - throw new UnsupportedOperationException("error"); - } - - cArray[bucketNum][emptySlot * 2] = key; - cArray[bucketNum][emptySlot * 2 + 1] = value; - cArray[bucketNum][0]++; - return 1; - } - - public int get(int bucketNum, int key) { - int i; - - if (cArray[bucketNum][0] == 0) { - return -1; - } - - for (i = 1; i < NUM_OF_SLOTS; i++) { - if (cArray[bucketNum][i * 2] == key) { - return cArray[bucketNum][i * 2 + 1]; - } - } - return -1; - } - - /** - * remove key if it exists. Otherwise, ignore it. - * - * @param bucketNum - * @param key - * @return 1 for success, 0 if the key doesn't exist - */ - public int remove(int bucketNum, int key) { - int i; - - if (cArray[bucketNum][0] == 0) { - return 0; - } - - for (i = 1; i < NUM_OF_SLOTS; i++) { - if (cArray[bucketNum][i * 2] == key) { - cArray[bucketNum][i * 2] = -1; - cArray[bucketNum][0]--; - return 1; - } - } - - return 0; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aac1e936/asterixdb/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index 6b2dfb4..cc08e96 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -1207,6 +1207,11 @@ <artifactId>commons-codec</artifactId> <version>1.9</version> </dependency> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <version>8.1.1</version> + </dependency> </dependencies> </dependencyManagement>