IGNITE-7638 Implemented robin-hood hashing for FullPageIdTable - Fixes #3481.
Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f8ec79d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f8ec79d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f8ec79d Branch: refs/heads/master Commit: 1f8ec79d22305fb5a6e28c9c9fa8efa2a2e391bf Parents: ac3d545 Author: dpavlov <dpav...@gridgain.com> Authored: Tue Mar 13 16:18:54 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Tue Mar 13 16:18:54 2018 +0300 ---------------------------------------------------------------------- modules/core/pom.xml | 7 + .../apache/ignite/IgniteSystemProperties.java | 7 + .../GridCacheDatabaseSharedManager.java | 18 +- .../pagemem/CheckpointMetricsTracker.java | 2 +- .../persistence/pagemem/EvictCandidate.java | 77 --- .../persistence/pagemem/FullPageIdTable.java | 358 +++++----- .../persistence/pagemem/LoadedPagesMap.java | 137 ++++ .../cache/persistence/pagemem/PageMemoryEx.java | 8 +- .../persistence/pagemem/PageMemoryImpl.java | 192 +++--- .../persistence/pagemem/ReplaceCandidate.java | 78 +++ .../pagemem/RobinHoodBackwardShiftHashMap.java | 654 +++++++++++++++++++ .../checkpoint/IgniteMassLoadSandboxTest.java | 2 +- .../pagemem/FullPageIdTableTest.java | 195 +++++- .../RobinHoodBackwardShiftHashMapTest.java | 430 ++++++++++++ .../testsuites/IgnitePdsUnitTestSuite.java | 7 +- 15 files changed, 1812 insertions(+), 360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/core/pom.xml b/modules/core/pom.xml index 68c19a7..e56d141 100644 --- a/modules/core/pom.xml +++ b/modules/core/pom.xml @@ -124,6 +124,13 @@ </dependency> <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.9.5</version> http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index b1a1542..11419ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -815,12 +815,19 @@ public final class IgniteSystemProperties { * Because other thread may require exactly the same page to be loaded from store, reads are protected by locking. */ public static final String IGNITE_DELAYED_REPLACED_PAGE_WRITE = "IGNITE_DELAYED_REPLACED_PAGE_WRITE"; + /** * When set to {@code true}, WAL implementation with dedicated worker will be used even in FSYNC mode. * Default is {@code false}. */ public static final String IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER = "IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER"; + /** + * When set to {@code false}, loaded pages implementation is switched to previous version of implementation, + * FullPageIdTable. {@code True} value enables 'Robin Hood hashing: backward shift deletion'. + * Default is {@code true}. + */ + public static final String IGNITE_LOADED_PAGES_BACKWARD_SHIFT_MAP = "IGNITE_LOADED_PAGES_BACKWARD_SHIFT_MAP"; /** * Enforces singleton. http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index d4ba4ef..d6a8a30 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -113,8 +113,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; @@ -141,7 +139,6 @@ import org.apache.ignite.internal.util.lang.GridInClosure3X; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P3; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -1251,11 +1248,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) { final Collection<Integer> grpIds = entry.getValue(); - clearFuts.add(entry.getKey().clearAsync(new P3<Integer, Long, Integer>() { - @Override public boolean apply(Integer grpId, Long pageId, Integer tag) { - return grpIds.contains(grpId); - } - }, false)); + clearFuts.add(entry.getKey().clearAsync((grpId, pageIdg) -> grpIds.contains(grpId), false)); } for (IgniteInternalFuture<Void> clearFut : clearFuts) { @@ -1985,12 +1978,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan PageMemoryEx pageMem = gId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(gId); - pageMem.clearAsync(new P3<Integer, Long, Integer>() { - @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) { - return cacheId == gId && PageIdUtils.partId(pageId) == pId; - } - }, true).get(); - + pageMem.clearAsync( + (grpId, pageId) -> grpId == gId && PageIdUtils.partId(pageId) == pId, + true).get(); } break; http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java index e533f41..dbe0f40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java @@ -70,7 +70,7 @@ public class CheckpointMetricsTracker { private long cpEnd; /** - * + * Increments counter if copy on write page was written. */ public void onCowPageWritten() { COW_PAGES_UPDATER.incrementAndGet(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/EvictCandidate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/EvictCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/EvictCandidate.java deleted file mode 100644 index eb172c9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/EvictCandidate.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence.pagemem; - -import org.apache.ignite.internal.pagemem.FullPageId; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * - */ -public class EvictCandidate { - /** */ - private int tag; - - /** */ - @GridToStringExclude - private long relPtr; - - /** */ - @GridToStringInclude - private FullPageId fullId; - - /** - * @param tag Tag. - * @param relPtr Relative pointer. - * @param fullId Full page ID. - */ - public EvictCandidate(int tag, long relPtr, FullPageId fullId) { - this.tag = tag; - this.relPtr = relPtr; - this.fullId = fullId; - } - - /** - * @return Tag. - */ - public int tag() { - return tag; - } - - /** - * @return Relative pointer. - */ - public long relativePointer() { - return relPtr; - } - - /** - * @return Index. - */ - public FullPageId fullId() { - return fullId; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(EvictCandidate.class, this, "relPtr", U.hexLong(relPtr)); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTable.java index 19d26ff..7fabfe0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTable.java @@ -17,13 +17,14 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; +import java.util.function.BiConsumer; import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.lang.GridPredicate3; +import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR; import static org.apache.ignite.IgniteSystemProperties.getFloat; @@ -31,7 +32,7 @@ import static org.apache.ignite.IgniteSystemProperties.getFloat; /** * */ -public class FullPageIdTable { +public class FullPageIdTable implements LoadedPagesMap { /** Load factor. */ private static final float LOAD_FACTOR = getFloat(IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR, 2.5f); @@ -68,6 +69,15 @@ public class FullPageIdTable { /** */ private static final int OUTDATED = -3; + /** Tag offset from entry base. */ + private static final int TAG_OFFSET = 4; + + /** Page id offset from entry base. */ + private static final int PAGE_ID_OFFSET = 8; + + /** Value offset from entry base. */ + private static final int VALUE_OFFSET = 16; + /** Max size, in elements. */ protected int capacity; @@ -102,31 +112,21 @@ public class FullPageIdTable { clear(); } - /** - * @return Current number of entries in the map. - */ - public final int size() { + /** {@inheritDoc} */ + @Override public final int size() { return GridUnsafe.getInt(valPtr); } - /** - * @return Maximum number of entries in the map. This maximum can not be always reached. - */ - public final int capacity() { + /** {@inheritDoc} */ + @Override public final int capacity() { return capacity; } - /** - * Gets value associated with the given key. - * - * @param cacheId Cache ID. - * @param pageId Page ID. - * @return A value associated with the given key. - */ - public long get(int cacheId, long pageId, int tag, long absent, long outdated) { - assert assertKey(cacheId, pageId); + /** {@inheritDoc} */ + @Override public long get(int grpId, long pageId, int reqVer, long absent, long outdated) { + assert assertKey(grpId, pageId); - int idx = getKey(cacheId, pageId, tag, false); + int idx = getKey(grpId, pageId, reqVer, false); if (idx == -1) return absent; @@ -137,112 +137,97 @@ public class FullPageIdTable { return valueAt(idx); } - /** - * Refresh outdated value. - * - * @param cacheId Cache ID. - * @param pageId Page ID. - * @param tag Partition tag. - * @return A value associated with the given key. - */ - public long refresh(int cacheId, long pageId, int tag) { - assert assertKey(cacheId, pageId); + /** {@inheritDoc} */ + @Override public long refresh(int grpId, long pageId, int ver) { + assert assertKey(grpId, pageId); - int idx = getKey(cacheId, pageId, tag, true); + int idx = getKey(grpId, pageId, ver, true); - assert idx >= 0 : "[idx=" + idx + ", tag=" + tag + ", cacheId=" + cacheId + - ", pageId=" + U.hexLong(pageId) + ']'; + if (!(idx >= 0) || !(tagAt(idx) < ver)) { + A.ensure(idx >= 0, "[idx=" + idx + ", tag=" + ver + ", cacheId=" + grpId + + ", pageId=" + U.hexLong(pageId) + ']'); - assert tagAt(idx) < tag : "[idx=" + idx + ", tag=" + tag + ", cacheId=" + cacheId + - ", pageId=" + U.hexLong(pageId) + ", tagAtIdx=" + tagAt(idx) + ']'; + A.ensure(tagAt(idx) < ver, "[idx=" + idx + ", tag=" + ver + ", cacheId=" + grpId + + ", pageId=" + U.hexLong(pageId) + ", tagAtIdx=" + tagAt(idx) + ']'); + } - setTagAt(idx, tag); + setTagAt(idx, ver); return valueAt(idx); } - /** - * Associates the given key with the given value. - * @param cacheId Cache ID - * @param pageId Page ID. - * @param value Value to set. - */ - public void put(int cacheId, long pageId, long value, int tag) { - assert assertKey(cacheId, pageId); + /** {@inheritDoc} */ + @Override public void put(int grpId, long pageId, long val, int ver) { + assert assertKey(grpId, pageId); - int index = putKey(cacheId, pageId, tag); + int idx = putKey(grpId, pageId, ver); - setValueAt(index, value); + setValueAt(idx, val); } - /** - * Removes key-value association for the given key. - * - * @param grpId Cache group ID. - * @param pageId Page ID. - */ - public void remove(int grpId, long pageId, int tag) { + /** {@inheritDoc} */ + @Override public boolean remove(int grpId, long pageId) { assert assertKey(grpId, pageId); - int index = removeKey(grpId, pageId, tag); + int idx = removeKey(grpId, pageId); + + boolean valRmv = idx >= 0; + + if (valRmv) + setValueAt(idx, 0); - if (index >= 0) - setValueAt(index, 0); + return valRmv; } - /** - * Find nearest value from specified position to the right. - * - * @param idx Index to start searching from. - * @param absent Default value that will be returned if no values present. - * @return Closest value to the index and it's partition tag or {@code absent} and -1 if no values found. - */ - public EvictCandidate getNearestAt(final int idx, final long absent) { - for (int i = idx; i < capacity + idx; i++) { - final int idx2 = i >= capacity ? i - capacity : i; + /** {@inheritDoc} */ + @Override public ReplaceCandidate getNearestAt(final int idxStart) { + for (int i = idxStart; i < capacity + idxStart; i++) { + final int idx2 = normalizeIndex(i); if (isValuePresentAt(idx2)) { long base = entryBase(idx2); - int cacheId = GridUnsafe.getInt(base); - int tag = GridUnsafe.getInt(base + 4); - long pageId = GridUnsafe.getLong(base + 8); - long val = GridUnsafe.getLong(base + 16); + int grpId = GridUnsafe.getInt(base); + int tag = GridUnsafe.getInt(base + TAG_OFFSET); + long pageId = GridUnsafe.getLong(base + PAGE_ID_OFFSET); + long val = GridUnsafe.getLong(base + VALUE_OFFSET); - return new EvictCandidate(tag, val, new FullPageId(pageId, cacheId)); + return new ReplaceCandidate(tag, val, new FullPageId(pageId, grpId)); } } return null; } - /** - * @param idx Index to clear value at. - * @param pred Test predicate. - * @param absent Value to return if the cell is empty. - * @return Value at the given index. - */ - public long clearAt(int idx, GridPredicate3<Integer, Long, Integer> pred, long absent) { - long base = entryBase(idx); + /** {@inheritDoc} */ + @Override public GridLongList removeIf(int startIdxToClear, int endIdxToClear, KeyPredicate keyPred) { + assert endIdxToClear > startIdxToClear + : "Start and end indexes are not consistent: {" + startIdxToClear + ", " + endIdxToClear + "}"; - int grpId = GridUnsafe.getInt(base); - int tag = GridUnsafe.getInt(base + 4); - long pageId = GridUnsafe.getLong(base + 8); + int sz = endIdxToClear - startIdxToClear; - if ((pageId == REMOVED_PAGE_ID && grpId == REMOVED_CACHE_GRP_ID) - || (pageId == EMPTY_PAGE_ID && grpId == EMPTY_CACHE_GRP_ID)) - return absent; + GridLongList list = new GridLongList(sz); + + for (int idx = startIdxToClear; idx < endIdxToClear; idx++) { + long base = entryBase(idx); + + int grpId = GridUnsafe.getInt(base); + long pageId = GridUnsafe.getLong(base + PAGE_ID_OFFSET); + + if (isRemoved(grpId, pageId) || isEmpty(grpId, pageId)) + continue; + + if (!keyPred.test(grpId, pageId)) + continue; - if (pred.apply(grpId, pageId, tag)) { long res = valueAt(idx); - setKeyAt(idx, REMOVED_CACHE_GRP_ID, REMOVED_PAGE_ID); - setValueAt(idx, 0); + setRemoved(idx); - return res; + list.add(res); } - else - return absent; + + return list; } /** @@ -253,50 +238,50 @@ public class FullPageIdTable { private int putKey(int cacheId, long pageId, int tag) { int step = 1; - int index = U.safeAbs(FullPageId.hashCode(cacheId, pageId)) % capacity; + int idx = U.safeAbs(FullPageId.hashCode(cacheId, pageId)) % capacity; - int foundIndex = -1; + int foundIdx = -1; int res; do { - res = testKeyAt(index, cacheId, pageId, tag); + res = testKeyAt(idx, cacheId, pageId, tag); if (res == OUTDATED) { - foundIndex = index; + foundIdx = idx; break; } else if (res == EMPTY) { - if (foundIndex == -1) - foundIndex = index; + if (foundIdx == -1) + foundIdx = idx; break; } else if (res == REMOVED) { // Must continue search to the first empty slot to ensure there are no duplicate mappings. - if (foundIndex == -1) - foundIndex = index; + if (foundIdx == -1) + foundIdx = idx; } else if (res == EQUAL) - return index; + return idx; else assert res == NOT_EQUAL; - index++; + idx++; - if (index >= capacity) - index -= capacity; + if (idx >= capacity) + idx -= capacity; } while (++step <= maxSteps); - if (foundIndex != -1) { - setKeyAt(foundIndex, cacheId, pageId); - setTagAt(foundIndex, tag); + if (foundIdx != -1) { + setKeyAt(foundIdx, cacheId, pageId); + setTagAt(foundIdx, tag); if (res != OUTDATED) incrementSize(); - return foundIndex; + return foundIdx; } throw new IgniteOutOfMemoryException("No room for a new key"); @@ -305,6 +290,7 @@ public class FullPageIdTable { /** * @param cacheId Cache ID. * @param pageId Page ID. + * @param refresh Refresh. * @return Key index. */ private int getKey(int cacheId, long pageId, int tag, boolean refresh) { @@ -335,22 +321,97 @@ public class FullPageIdTable { } /** + * @param grpId Cache group ID. + * @param pageId Page ID. + * @return {@code true} if group & page id indicates cell has state 'Removed'. + */ + private boolean isRemoved(int grpId, long pageId) { + return pageId == REMOVED_PAGE_ID && grpId == REMOVED_CACHE_GRP_ID; + } + + /** + * @param idx cell index, normalized. + * @return {@code true} if cell with index idx has state 'Empty'. + */ + private boolean isRemoved(int idx) { + long base = entryBase(idx); + + int grpId = GridUnsafe.getInt(base); + long pageId = GridUnsafe.getLong(base + PAGE_ID_OFFSET); + + return isRemoved(grpId, pageId); + } + + /** + * Sets cell state to 'Removed' or to 'Empty' if next cell is already 'Empty'. + * @param idx cell index, normalized. + */ + private void setRemoved(int idx) { + setKeyAt(idx, REMOVED_CACHE_GRP_ID, REMOVED_PAGE_ID); + + setValueAt(idx, 0); + } + + /** + * @param grpId Cache group ID. + * @param pageId Page ID. + * @return {@code true} if group & page id indicates cell has state 'Empty'. + */ + private boolean isEmpty(int grpId, long pageId) { + return pageId == EMPTY_PAGE_ID && grpId == EMPTY_CACHE_GRP_ID; + } + + /** + * @param idx cell index, normalized. + * @return {@code true} if cell with index idx has state 'Empty'. + */ + private boolean isEmpty(int idx) { + long base = entryBase(idx); + + int grpId = GridUnsafe.getInt(base); + long pageId = GridUnsafe.getLong(base + PAGE_ID_OFFSET); + + return isEmpty(grpId, pageId); + } + + /** + * Sets cell state to 'Empty'. + * + * @param idx cell index, normalized. + */ + private void setEmpty(int idx) { + setKeyAt(idx, EMPTY_CACHE_GRP_ID, EMPTY_PAGE_ID); + + setValueAt(idx, 0); + } + + /** + * @param i index probably outsize internal array of cells. + * @return corresponding index inside cells array. + */ + private int normalizeIndex(int i) { + assert i < 2 * capacity; + + return i < capacity ? i : i - capacity; + } + + /** * @param cacheId Cache ID. * @param pageId Page ID. * @return Key index. */ - private int removeKey(int cacheId, long pageId, int tag) { + private int removeKey(int cacheId, long pageId) { int step = 1; - int index = U.safeAbs(FullPageId.hashCode(cacheId, pageId)) % capacity; + int idx = U.safeAbs(FullPageId.hashCode(cacheId, pageId)) % capacity; - int foundIndex = -1; + int foundIdx = -1; do { - long res = testKeyAt(index, cacheId, pageId, tag); + long res = testKeyAt(idx, cacheId, pageId, -1); if (res == EQUAL || res == OUTDATED) { - foundIndex = index; + foundIdx = idx; break; } @@ -359,20 +420,20 @@ public class FullPageIdTable { else assert res == REMOVED || res == NOT_EQUAL; - index++; + idx++; - if (index >= capacity) - index -= capacity; + if (idx >= capacity) + idx -= capacity; } while (++step <= maxSteps); - if (foundIndex != -1) { - setKeyAt(foundIndex, REMOVED_CACHE_GRP_ID, REMOVED_PAGE_ID); + if (foundIdx != -1) { + setRemoved(foundIdx); decrementSize(); } - return foundIndex; + return foundIdx; } /** @@ -383,16 +444,16 @@ public class FullPageIdTable { long base = entryBase(index); int grpId = GridUnsafe.getInt(base); - int tag = GridUnsafe.getInt(base + 4); - long pageId = GridUnsafe.getLong(base + 8); + int tag = GridUnsafe.getInt(base + TAG_OFFSET); + long pageId = GridUnsafe.getLong(base + PAGE_ID_OFFSET); - if (pageId == REMOVED_PAGE_ID && grpId == REMOVED_CACHE_GRP_ID) + if (isRemoved(grpId, pageId)) return REMOVED; else if (pageId == testPageId && grpId == testCacheId && tag >= testTag) return EQUAL; else if (pageId == testPageId && grpId == testCacheId && tag < testTag) return OUTDATED; - else if (pageId == EMPTY_PAGE_ID && grpId == EMPTY_CACHE_GRP_ID) + else if (isEmpty(grpId, pageId)) return EMPTY; else return NOT_EQUAL; @@ -406,10 +467,9 @@ public class FullPageIdTable { long base = entryBase(idx); int grpId = GridUnsafe.getInt(base); - long pageId = GridUnsafe.getLong(base + 8); + long pageId = GridUnsafe.getLong(base + PAGE_ID_OFFSET); - return !((pageId == REMOVED_PAGE_ID && grpId == REMOVED_CACHE_GRP_ID) - || (pageId == EMPTY_PAGE_ID && grpId == EMPTY_CACHE_GRP_ID)); + return !isRemoved(grpId, pageId) && !isEmpty(grpId, pageId); } /** @@ -425,15 +485,15 @@ public class FullPageIdTable { } /** - * @param index Entry index. + * @param idx Entry index. * @param grpId Cache group ID to write. * @param pageId Page ID to write. */ - private void setKeyAt(int index, int grpId, long pageId) { - long base = entryBase(index); + private void setKeyAt(int idx, int grpId, long pageId) { + long base = entryBase(idx); GridUnsafe.putInt(base, grpId); - GridUnsafe.putLong(base + 8, pageId); + GridUnsafe.putLong(base + PAGE_ID_OFFSET, pageId); } /** @@ -478,21 +538,17 @@ public class FullPageIdTable { return found ? scans : -scans; } - /** - * Scans all the elements in this table. - * - * @param visitor Visitor. - */ - public void visitAll(IgniteBiInClosure<FullPageId, Long> visitor) { + /** {@inheritDoc} */ + @Override public void forEach(BiConsumer<FullPageId, Long> act) { for (int i = 0; i < capacity; i++) { if (isValuePresentAt(i)) { long base = entryBase(i); int cacheId = GridUnsafe.getInt(base); - long pageId = GridUnsafe.getLong(base + 8); - long val = GridUnsafe.getLong(base + 16); + long pageId = GridUnsafe.getLong(base + PAGE_ID_OFFSET); + long val = GridUnsafe.getLong(base + VALUE_OFFSET); - visitor.apply(new FullPageId(pageId, cacheId), val); + act.accept(new FullPageId(pageId, cacheId), val); } } } @@ -502,7 +558,7 @@ public class FullPageIdTable { * @return Value. */ private long valueAt(int index) { - return GridUnsafe.getLong(entryBase(index) + 16); + return GridUnsafe.getLong(entryBase(index) + VALUE_OFFSET); } /** @@ -510,27 +566,31 @@ public class FullPageIdTable { * @param value Value. */ private void setValueAt(int index, long value) { - GridUnsafe.putLong(entryBase(index) + 16, value); + GridUnsafe.putLong(entryBase(index) + VALUE_OFFSET, value); } - private long entryBase(int index) { - return valPtr + 8 + (long)index * BYTES_PER_ENTRY; + /** + * @param idx Entry index. + * @return address of entry. + */ + private long entryBase(int idx) { + return valPtr + 8 + (long)idx * BYTES_PER_ENTRY; } /** - * @param index Index to get tag for. + * @param idx Index to get tag for. * @return Tag at the given index. */ - private int tagAt(int index) { - return GridUnsafe.getInt(entryBase(index) + 4); + private int tagAt(int idx) { + return GridUnsafe.getInt(entryBase(idx) + TAG_OFFSET); } /** - * @param index Index to set tag for. + * @param idx Index to set tag for. * @param tag Tag to set at the given index. */ - private void setTagAt(int index, int tag) { - GridUnsafe.putInt(entryBase(index) + 4, tag); + private void setTagAt(int idx, int tag) { + GridUnsafe.putInt(entryBase(idx) + TAG_OFFSET, tag); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/LoadedPagesMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/LoadedPagesMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/LoadedPagesMap.java new file mode 100644 index 0000000..1b23238 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/LoadedPagesMap.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.function.BiConsumer; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.util.GridLongList; + +/** + * Interface for storing correspondence of page ID in a cache group to long value (address in offheap segment). <br> + * Map support versioning of entries. Outdated entry (entry having version lower than requested), + * is not provided in case of get, outdated return value is provided instead.<br> + * + * This mapping is not thread safe. Operations should be protected by outside locking.<br> + */ +public interface LoadedPagesMap { + /** + * Gets value associated with the given key. + * + * @param grpId Cache Group ID. First part of the key. + * @param pageId Page ID. Second part of the key. + * @param reqVer Requested entry version, counter associated with value. + * @param absent return if provided page is not presented in map. + * @param outdated return if provided {@code reqVer} version is greater than value in map (was used for put). + * @return A value associated with the given key. + */ + public long get(int grpId, long pageId, int reqVer, long absent, long outdated); + + /** + * Associates the given key with the given value. + * + * @param grpId Cache Group ID. First part of the key. + * @param pageId Page ID. Second part of the key. + * @param val Value to set. + * @param ver Version/counter associated with value, can be used to check if value is outdated. + */ + public void put(int grpId, long pageId, long val, int ver); + + /** + * Refresh outdated value. Sets provided version to value associated with cache and page. + * Method should be called only for key present and only if version was outdated. + * Method may be called in case {@link #get(int, long, int, long, long)} returned {@code outdated} return value. + * + * @param grpId First part of the key. Cache Group ID. + * @param pageId Second part of the key. Page ID. + * @param ver Partition tag. + * @return A value associated with the given key. + * @throws IllegalArgumentException if method is called for absent key or key with fresh version. + */ + public long refresh(int grpId, long pageId, int ver); + + /** + * Removes key-value association for the given key. + * @param grpId First part of the key. Cache Group ID. + * @param pageId Second part of the key. Page ID. + * @return {@code True} if value was actually found and removed. + */ + public boolean remove(int grpId, long pageId); + + /** + * @return Maximum number of entries in the map. This maximum can not be always reached. + */ + public int capacity(); + + /** + * @return Current number of entries in the map. + */ + public int size(); + + /** + * Find nearest presented value from specified position to the right. + * + * @param idxStart Index to start searching from. Bounded with {@link #capacity()}. + * @return Closest value to the index and it's partition tag or {@code null} value that will + * be returned if no values present. + */ + public ReplaceCandidate getNearestAt(int idxStart); + + + /** + * Removes entities matching provided predicate at specified mapping range. + * + * @param startIdxToClear Index to clear value at, inclusive. Bounded with {@link #capacity()}. + * @param endIdxToClear Index to clear value at, inclusive. Bounded with {@link #capacity()}. + * @param keyPred Test predicate for (cache group ID, page ID). + * @return List with removed values, value is not added to list for empty cell or if key is not matching to + * predicate. + */ + public GridLongList removeIf(int startIdxToClear, int endIdxToClear, KeyPredicate keyPred); + + + /** + * Removes entities matching provided predicate. + * + * @param keyPred Test predicate for (cache group ID, page ID). + * @return List with removed values, value is not added to list for empty cell or if key is not matching to + * predicate. + */ + default GridLongList removeIf(KeyPredicate keyPred) { + return removeIf(0, capacity(), keyPred); + } + + /** + * Scans all the elements in this table. + * + * @param act Visitor/action to be applied to each not empty cell. + */ + public void forEach(BiConsumer<FullPageId, Long> act); + + /** + * Interface describing a predicate for Key (cache group ID, page ID). Usage of this predicate prevents odd object + * creation. + */ + @FunctionalInterface public interface KeyPredicate { + /** + * Predicate body. + * + * @param grpId Cache group ID. + * @param pageId Page ID. + */ + boolean test(int grpId, long pageId); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java index 9c37508..e889fc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java @@ -117,7 +117,7 @@ public interface PageMemoryEx extends PageMemory { * the {@link #beginCheckpoint()} method call. * @param outBuf Temporary buffer to write changes into. * @param tracker Checkpoint metrics tracker. - * @return {@code Partition tag} if data was read, {@code null} otherwise (data already saved to storage). + * @return {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage). * @throws IgniteException If failed to obtain page data. */ @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer outBuf, CheckpointMetricsTracker tracker); @@ -127,7 +127,7 @@ public interface PageMemoryEx extends PageMemory { * * @param grpId Group ID. * @param partId Partition ID. - * @return New partition tag (growing 1-based partition file version). + * @return New partition generation (growing 1-based partition file version). */ public int invalidate(int grpId, int partId); @@ -141,9 +141,9 @@ public interface PageMemoryEx extends PageMemory { /** * Asynchronously clears pages satisfying the given predicate. * - * @param pred Predicate for cache group id, pageId and partition tag. + * @param pred Predicate for cache group id, pageId. * @param cleanDirty Flag indicating that dirty pages collection should be cleaned. * @return Future that will be completed when all pages are cleared. */ - public IgniteInternalFuture<Void> clearAsync(GridPredicate3<Integer, Long, Integer> pred, boolean cleanDirty); + public IgniteInternalFuture<Void> clearAsync(LoadedPagesMap.KeyPredicate pred, boolean cleanDirty); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 27c9b38..fa10a1f 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; @@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockSta import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO; @@ -76,9 +77,7 @@ import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.OffheapReadWriteLock; import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.lang.GridInClosure3X; -import org.apache.ignite.internal.util.lang.GridPredicate3; import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; @@ -157,7 +156,7 @@ public class PageMemoryImpl implements PageMemoryEx { /** Page lock offset. */ public static final int PAGE_LOCK_OFFSET = 32; - /** Page lock offset. */ + /** Page temp copy buffer relative pointer offset. */ public static final int PAGE_TMP_BUF_OFFSET = 40; /** @@ -189,6 +188,10 @@ public class PageMemoryImpl implements PageMemoryEx { /** Number of used pages in checkpoint buffer. */ private final AtomicInteger cpBufPagesCntr = new AtomicInteger(0); + /** Use new implementation of loaded pages table: 'Robin Hood hashing: backward shift deletion'. */ + private final boolean useBackwardShiftMap + = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_LOADED_PAGES_BACKWARD_SHIFT_MAP, true); + /** */ private ExecutorService asyncRunner = new ThreadPoolExecutor( 0, @@ -483,7 +486,7 @@ public class PageMemoryImpl implements PageMemoryEx { long relPtr = seg.loadedPages.get( cacheId, PageIdUtils.effectivePageId(pageId), - seg.partTag(cacheId, partId), + seg.partGeneration(cacheId, partId), INVALID_REL_PTR, OUTDATED_REL_PTR ); @@ -536,7 +539,7 @@ public class PageMemoryImpl implements PageMemoryEx { } } - seg.loadedPages.put(cacheId, PageIdUtils.effectivePageId(pageId), relPtr, seg.partTag(cacheId, partId)); + seg.loadedPages.put(cacheId, PageIdUtils.effectivePageId(pageId), relPtr, seg.partGeneration(cacheId, partId)); } catch (IgniteOutOfMemoryException oom) { DataRegionConfiguration dataRegionCfg = getDataRegionConfiguration(); @@ -628,7 +631,7 @@ public class PageMemoryImpl implements PageMemoryEx { long relPtr = seg.loadedPages.get( cacheId, PageIdUtils.effectivePageId(pageId), - seg.partTag(cacheId, partId), + seg.partGeneration(cacheId, partId), INVALID_REL_PTR, INVALID_REL_PTR ); @@ -659,7 +662,7 @@ public class PageMemoryImpl implements PageMemoryEx { long relPtr = seg.loadedPages.get( cacheId, PageIdUtils.effectivePageId(pageId), - seg.partTag(cacheId, partId), + seg.partGeneration(cacheId, partId), INVALID_REL_PTR, OUTDATED_REL_PTR ); @@ -688,7 +691,7 @@ public class PageMemoryImpl implements PageMemoryEx { cacheId, PageIdUtils.effectivePageId(pageId), relPtr, - seg.partTag(cacheId, partId) + seg.partGeneration(cacheId, partId) ); long pageAddr = absPtr + PAGE_OVERHEAD; @@ -788,7 +791,7 @@ public class PageMemoryImpl implements PageMemoryEx { private long refreshOutdatedPage(Segment seg, int cacheId, long pageId, boolean rmv) { assert seg.writeLock().isHeldByCurrentThread(); - int tag = seg.partTag(cacheId, PageIdUtils.partId(pageId)); + int tag = seg.partGeneration(cacheId, PageIdUtils.partId(pageId)); long relPtr = seg.loadedPages.refresh(cacheId, PageIdUtils.effectivePageId(pageId), tag); @@ -812,7 +815,7 @@ public class PageMemoryImpl implements PageMemoryEx { } if (rmv) - seg.loadedPages.remove(cacheId, PageIdUtils.effectivePageId(pageId), tag); + seg.loadedPages.remove(cacheId, PageIdUtils.effectivePageId(pageId)); if (seg.segCheckpointPages != null) seg.segCheckpointPages.remove(new FullPageId(pageId, cacheId)); @@ -1026,7 +1029,7 @@ public class PageMemoryImpl implements PageMemoryEx { int tag; - boolean tmpBuffer = false; + boolean pageSingleAcquire = false; seg.readLock().lock(); @@ -1034,7 +1037,7 @@ public class PageMemoryImpl implements PageMemoryEx { if (!isInCheckpoint(fullId)) return null; - tag = seg.partTag(fullId.groupId(), PageIdUtils.partId(fullId.pageId())); + tag = seg.partGeneration(fullId.groupId(), PageIdUtils.partId(fullId.pageId())); relPtr = seg.loadedPages.get( fullId.groupId(), @@ -1055,7 +1058,7 @@ public class PageMemoryImpl implements PageMemoryEx { if (PageHeader.tempBufferPointer(absPtr) == INVALID_REL_PTR) PageHeader.acquirePage(absPtr); else - tmpBuffer = true; + pageSingleAcquire = true; } } finally { @@ -1070,7 +1073,7 @@ public class PageMemoryImpl implements PageMemoryEx { relPtr = seg.loadedPages.get( fullId.groupId(), PageIdUtils.effectivePageId(fullId.pageId()), - seg.partTag( + seg.partGeneration( fullId.groupId(), PageIdUtils.partId(fullId.pageId()) ), @@ -1099,19 +1102,22 @@ public class PageMemoryImpl implements PageMemoryEx { } } else - return copyPageForCheckpoint(absPtr, fullId, outBuf, tmpBuffer, tracker) ? tag : null; + return copyPageForCheckpoint(absPtr, fullId, outBuf, pageSingleAcquire, tracker) ? tag : null; } /** * @param absPtr Absolute ptr. * @param fullId Full id. - * @param tmpBuf Tmp buffer. + * @param outBuf Output buffer to write page content into. + * @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be + * copied) in case checkpoint temporary buffer is used. + * @param tracker Checkpoint statistics tracker. */ private boolean copyPageForCheckpoint( long absPtr, FullPageId fullId, - ByteBuffer tmpBuf, - boolean tmpBuffer, + ByteBuffer outBuf, + boolean pageSingleAcquire, CheckpointMetricsTracker tracker ) { assert absPtr != 0; @@ -1131,7 +1137,7 @@ public class PageMemoryImpl implements PageMemoryEx { long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr); - copyInBuffer(tmpAbsPtr, tmpBuf); + copyInBuffer(tmpAbsPtr, outBuf); GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), (byte)0); @@ -1145,12 +1151,12 @@ public class PageMemoryImpl implements PageMemoryEx { // Need release again because we pin page when resolve abs pointer, // and page did not have tmp buffer page. - if (!tmpBuffer) + if (!pageSingleAcquire) PageHeader.releasePage(absPtr); } else { - copyInBuffer(absPtr, tmpBuf); + copyInBuffer(absPtr, outBuf); PageHeader.dirty(absPtr, false); @@ -1158,8 +1164,8 @@ public class PageMemoryImpl implements PageMemoryEx { PageHeader.releasePage(absPtr); } - assert PageIO.getType(tmpBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId()); - assert PageIO.getVersion(tmpBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId()); + assert PageIO.getType(outBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId()); + assert PageIO.getVersion(outBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId()); return true; } @@ -1199,7 +1205,7 @@ public class PageMemoryImpl implements PageMemoryEx { seg.writeLock().lock(); try { - int newTag = seg.incrementPartTag(grpId, partId); + int newTag = seg.incrementPartGeneration(grpId, partId); if (tag == 0) tag = newTag; @@ -1220,7 +1226,7 @@ public class PageMemoryImpl implements PageMemoryEx { seg.writeLock().lock(); try { - seg.resetPartTags(grpId); + seg.resetGroupPartitionsGeneration(grpId); } finally { seg.writeLock().unlock(); @@ -1230,7 +1236,7 @@ public class PageMemoryImpl implements PageMemoryEx { /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> clearAsync( - GridPredicate3<Integer, Long, Integer> pred, + LoadedPagesMap.KeyPredicate pred, boolean cleanDirty ) { CountDownFuture completeFut = new CountDownFuture(segments.length); @@ -1811,16 +1817,22 @@ public class PageMemoryImpl implements PageMemoryEx { /** */ private static final double FULL_SCAN_THRESHOLD = 0.4; + /** Pointer to acquired pages integer counter. */ + private static final int ACQUIRED_PAGES_SIZEOF = 4; + + /** Padding to read from word beginning. */ + private static final int ACQUIRED_PAGES_PADDING = 4; + /** Page ID to relative pointer map. */ - private FullPageIdTable loadedPages; + private LoadedPagesMap loadedPages; - /** */ + /** Pointer to acquired pages integer counter. */ private long acquiredPagesPtr; /** */ private PagePool pool; - /** */ + /** Bytes required to store {@link #loadedPages}. */ private long memPerTbl; /** Pages marked as dirty since the last checkpoint. */ @@ -1832,8 +1844,11 @@ public class PageMemoryImpl implements PageMemoryEx { /** */ private final int maxDirtyPages; - /** Maps partition (cacheId, partId) to its tag. Tag is 1-based incrementing partition file counter */ - private final Map<T2<Integer, Integer>, Integer> partTagMap = new HashMap<>(); + /** Initial partition generation. */ + private static final int INIT_PART_GENERATION = 1; + + /** Maps partition (cacheId, partId) to its generation. Generation is 1-based incrementing partition counter. */ + private final Map<GroupPartitionId, Integer> partGenerationMap = new HashMap<>(); /** */ private boolean closed; @@ -1847,15 +1862,23 @@ public class PageMemoryImpl implements PageMemoryEx { int pages = (int)(totalMemory / sysPageSize); - memPerTbl = requiredSegmentTableMemory(pages); - acquiredPagesPtr = region.address(); GridUnsafe.putIntVolatile(null, acquiredPagesPtr, 0); - loadedPages = new FullPageIdTable(region.address() + 8, memPerTbl, true); + int ldPagesMapOffInRegion = ACQUIRED_PAGES_SIZEOF + ACQUIRED_PAGES_PADDING; + + long ldPagesAddr = region.address() + ldPagesMapOffInRegion; + + memPerTbl = useBackwardShiftMap + ? RobinHoodBackwardShiftHashMap.requiredMemory(pages) + : requiredSegmentTableMemory(pages); - DirectMemoryRegion poolRegion = region.slice(memPerTbl + 8); + loadedPages = useBackwardShiftMap + ? new RobinHoodBackwardShiftHashMap(ldPagesAddr, memPerTbl) + : new FullPageIdTable(ldPagesAddr, memPerTbl, true); + + DirectMemoryRegion poolRegion = region.slice(memPerTbl + ldPagesMapOffInRegion); pool = new PagePool(idx, poolRegion, null); @@ -1980,7 +2003,7 @@ public class PageMemoryImpl implements PageMemoryEx { saveDirtyPage.writePage( fullPageId, wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()), - partTag( + partGeneration( fullPageId.groupId(), PageIdUtils.partId(fullPageId.pageId()) ) @@ -2102,13 +2125,13 @@ public class PageMemoryImpl implements PageMemoryEx { // We need to lookup for pages only in current segment for thread safety, // so peeking random memory will lead to checking for found page segment. // It's much faster to check available pages for segment right away. - EvictCandidate nearest = loadedPages.getNearestAt(rnd.nextInt(cap), INVALID_REL_PTR); + ReplaceCandidate nearest = loadedPages.getNearestAt(rnd.nextInt(cap)); assert nearest != null && nearest.relativePointer() != INVALID_REL_PTR; long rndAddr = nearest.relativePointer(); - int partTag = nearest.tag(); + int partGen = nearest.generation(); final long absPageAddr = absolute(rndAddr); @@ -2118,7 +2141,7 @@ public class PageMemoryImpl implements PageMemoryEx { assert fullId.equals(nearest.fullId()) : "Invalid page mapping [tableId=" + nearest.fullId() + ", actual=" + fullId + ", nearest=" + nearest; - boolean outdated = partTag < partTag(fullId.groupId(), PageIdUtils.partId(fullId.pageId())); + boolean outdated = partGen < partGeneration(fullId.groupId(), PageIdUtils.partId(fullId.pageId())); if (outdated) return refreshOutdatedPage(this, fullId.groupId(), fullId.pageId(), true); @@ -2184,11 +2207,7 @@ public class PageMemoryImpl implements PageMemoryEx { loadedPages.remove( fullPageId.groupId(), - PageIdUtils.effectivePageId(fullPageId.pageId()), - partTag( - fullPageId.groupId(), - PageIdUtils.partId(fullPageId.pageId()) - ) + PageIdUtils.effectivePageId(fullPageId.pageId()) ); return relRmvAddr; @@ -2231,19 +2250,19 @@ public class PageMemoryImpl implements PageMemoryEx { int failToPrepare = 0; for (int i = 0; i < cap; i++) { - final EvictCandidate nearest = loadedPages.getNearestAt(i, INVALID_REL_PTR); + final ReplaceCandidate nearest = loadedPages.getNearestAt(i); assert nearest != null && nearest.relativePointer() != INVALID_REL_PTR; final long addr = nearest.relativePointer(); - int partTag = nearest.tag(); + int partGen = nearest.generation(); final long absPageAddr = absolute(addr); FullPageId fullId = PageHeader.fullPageId(absPageAddr); - if (partTag < partTag(fullId.groupId(), PageIdUtils.partId(fullId.pageId()))) + if (partGen < partGeneration(fullId.groupId(), PageIdUtils.partId(fullId.pageId()))) return refreshOutdatedPage(this, fullId.groupId(), fullId.pageId(), true); boolean pinned = PageHeader.isAcquired(absPageAddr); @@ -2261,11 +2280,7 @@ public class PageMemoryImpl implements PageMemoryEx { if (preparePageRemoval(fullPageId, absEvictAddr, saveDirtyPage)) { loadedPages.remove( fullPageId.groupId(), - PageIdUtils.effectivePageId(fullPageId.pageId()), - partTag( - fullPageId.groupId(), - PageIdUtils.partId(fullPageId.pageId()) - ) + PageIdUtils.effectivePageId(fullPageId.pageId()) ); return addr; @@ -2309,64 +2324,55 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param grpId Cache group ID. * @param partId Partition ID. - * @return Partition tag. Growing 1 based partition file version + * @return Partition generation. Growing, 1-based partition version. Changed */ - private int partTag(int grpId, int partId) { + private int partGeneration(int grpId, int partId) { assert getReadHoldCount() > 0 || getWriteHoldCount() > 0; - Integer tag = partTagMap.get(new T2<>(grpId, partId)); - - if (tag == null) - return 1; + Integer tag = partGenerationMap.get(new GroupPartitionId(grpId, partId)); - return tag; + return tag == null ? INIT_PART_GENERATION : tag; } /** + * Increments partition generation due to partition invalidation (e.g. partition was rebalanced to other node + * and evicted). + * * @param grpId Cache group ID. * @param partId Partition ID. - * @return New partition tag. + * @return New partition generation. */ - private int incrementPartTag(int grpId, int partId) { + private int incrementPartGeneration(int grpId, int partId) { assert getWriteHoldCount() > 0; - T2<Integer, Integer> t = new T2<>(grpId, partId); + GroupPartitionId grpPart = new GroupPartitionId(grpId, partId); - Integer tag = partTagMap.get(t); + Integer gen = partGenerationMap.get(grpPart); - if (tag == null) { - partTagMap.put(t, 2); + if (gen == null) + gen = INIT_PART_GENERATION; - return 2; - } - else if (tag == Integer.MAX_VALUE) { + if (gen == Integer.MAX_VALUE) { U.warn(log, "Partition tag overflow [grpId=" + grpId + ", partId=" + partId + "]"); - partTagMap.put(t, 0); + partGenerationMap.put(grpPart, 0); return 0; } else { - partTagMap.put(t, tag + 1); + partGenerationMap.put(grpPart, gen + 1); - return tag + 1; + return gen + 1; } } /** * @param grpId Cache group id. */ - private void resetPartTags(int grpId) { + private void resetGroupPartitionsGeneration(int grpId) { assert getWriteHoldCount() > 0; - Iterator<T2<Integer, Integer>> iter = partTagMap.keySet().iterator(); - - while (iter.hasNext()) { - T2<Integer, Integer> t = iter.next(); - - if (t.get1() == grpId) - iter.remove(); - } + partGenerationMap.keySet().removeIf(grpPart -> grpPart.getGroupId() == grpId); } } @@ -2552,14 +2558,19 @@ public class PageMemoryImpl implements PageMemoryEx { } /** + * Sets pointer to checkpoint buffer. + * * @param absPtr Page absolute pointer. - * @param tmpRelPtr Temp buffer relative pointer. + * @param tmpRelPtr Temp buffer relative pointer or {@link #INVALID_REL_PTR} if page is not copied to checkpoint + * buffer. */ private static void tempBufferPointer(long absPtr, long tmpRelPtr) { GridUnsafe.putLong(absPtr + PAGE_TMP_BUF_OFFSET, tmpRelPtr); } /** + * Gets pointer to checkpoint buffer or {@link #INVALID_REL_PTR} if page is not copied to checkpoint buffer. + * * @param absPtr Page absolute pointer. * @return Temp buffer relative pointer. */ @@ -2637,8 +2648,8 @@ public class PageMemoryImpl implements PageMemoryEx { /** */ private Segment seg; - /** */ - private GridPredicate3<Integer, Long, Integer> clearPred; + /** Clear element filter for (cache group ID, page ID). */ + LoadedPagesMap.KeyPredicate clearPred; /** */ private CountDownFuture doneFut; @@ -2651,12 +2662,12 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param seg Segment. - * @param clearPred Clear predicate. + * @param clearPred Clear predicate for (cache group ID, page ID). * @param doneFut Completion future. */ private ClearSegmentRunnable( Segment seg, - GridPredicate3<Integer, Long, Integer> clearPred, + LoadedPagesMap.KeyPredicate clearPred, boolean rmvDirty, CountDownFuture doneFut, int pageSize @@ -2682,14 +2693,11 @@ public class PageMemoryImpl implements PageMemoryEx { seg.writeLock().lock(); try { - while (base < boundary) { - long ptr = seg.loadedPages.clearAt(base, clearPred, INVALID_REL_PTR); + GridLongList list = seg.loadedPages.removeIf(base, boundary, clearPred); - if (ptr != INVALID_REL_PTR) - ptrs.add(ptr); + ptrs.addAll(list); - base++; - } + base = boundary; } finally { seg.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplaceCandidate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplaceCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplaceCandidate.java new file mode 100644 index 0000000..d760f07 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplaceCandidate.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Replacement removal candidate. Class represents some page from loaded pages table. Usually candidate is found during + * random {@link LoadedPagesMap} touch. + */ +public class ReplaceCandidate { + /** Partition generation saved in map, too old value means page may be safely cleared. */ + private int gen; + + /** */ + @GridToStringExclude + private long relPtr; + + /** */ + @GridToStringInclude + private FullPageId fullId; + + /** + * @param gen Partition generation. + * @param relPtr Relative pointer to page. + * @param fullId Full page ID. + */ + public ReplaceCandidate(int gen, long relPtr, FullPageId fullId) { + this.gen = gen; + this.relPtr = relPtr; + this.fullId = fullId; + } + + /** + * @return Partition generation saved in map, too old value means page may be safely cleared. + */ + public int generation() { + return gen; + } + + /** + * @return Relative pointer to page. + */ + public long relativePointer() { + return relPtr; + } + + /** + * @return Index. + */ + public FullPageId fullId() { + return fullId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ReplaceCandidate.class, this, "relPtr", U.hexLong(relPtr)); + } +}