IGNITE-6113 Fixed partition eviction preventing exchange completion - Fixes #3445.
Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6dc5804a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6dc5804a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6dc5804a Branch: refs/heads/master Commit: 6dc5804af6dda90ca210a39d622964d78e3890f1 Parents: 7b9526b Author: Pavel Kovalenko <jokse...@gmail.com> Authored: Tue Mar 6 16:27:22 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Tue Mar 6 16:27:22 2018 +0300 ---------------------------------------------------------------------- .../junits/IgniteCompatibilityNodeRunner.java | 1 + .../org/apache/ignite/cache/CacheMetrics.java | 5 + .../communication/GridIoMessageFactory.java | 6 + .../cache/CacheClusterMetricsMXBeanImpl.java | 5 + .../processors/cache/CacheGroupContext.java | 13 + .../cache/CacheLocalMetricsMXBeanImpl.java | 5 + .../processors/cache/CacheMetricsImpl.java | 16 + .../processors/cache/CacheMetricsSnapshot.java | 9 + .../processors/cache/GridCacheIoManager.java | 6 - .../GridCachePartitionExchangeManager.java | 11 +- .../processors/cache/GridCachePreloader.java | 16 +- .../cache/GridCachePreloaderAdapter.java | 12 +- .../cache/IgniteCacheOffheapManager.java | 14 +- .../cache/IgniteCacheOffheapManagerImpl.java | 110 ++-- .../cache/IgniteRebalanceIterator.java | 23 +- .../dht/GridClientPartitionTopology.java | 15 +- .../distributed/dht/GridDhtLocalPartition.java | 485 ++++++++++++---- .../dht/GridDhtPartitionTopologyImpl.java | 190 ++++--- .../dht/GridDhtPartitionsEvictor.java | 140 +++++ .../dht/GridDhtPartitionsReservation.java | 6 +- .../CachePartitionFullCountersMap.java | 22 + .../CachePartitionPartialCountersMap.java | 44 +- .../GridDhtPartitionDemandLegacyMessage.java | 435 +++++++++++++++ .../GridDhtPartitionDemandMessage.java | 201 +++---- .../dht/preloader/GridDhtPartitionDemander.java | 445 ++++++++------- .../dht/preloader/GridDhtPartitionSupplier.java | 548 ++++++++----------- .../GridDhtPartitionSupplyMessage.java | 22 +- .../GridDhtPartitionsExchangeFuture.java | 3 +- .../dht/preloader/GridDhtPreloader.java | 198 ++----- .../IgniteDhtDemandedPartitionsMap.java | 218 ++++++++ .../IgniteDhtPartitionCountersMap.java | 6 + .../IgniteDhtPartitionHistorySuppliersMap.java | 6 + .../IgniteDhtPartitionsToReloadMap.java | 6 + .../dht/preloader/IgniteHistoricalIterator.java | 39 ++ .../preloader/IgniteRebalanceIteratorImpl.java | 201 +++++++ .../persistence/GridCacheOffheapManager.java | 138 +++-- .../IgniteCachePartitionLossPolicySelfTest.java | 2 +- ...achePartitionPartialCountersMapSelfTest.java | 57 ++ .../GridCacheRebalancingSyncSelfTest.java | 3 +- ...idCacheRebalancingWithAsyncClearingTest.java | 240 ++++++++ .../IgnitePdsCacheRebalancingAbstractTest.java | 77 ++- .../file/IgnitePdsDiskErrorsRecoveringTest.java | 15 - .../wal/IgniteWalHistoryReservationsTest.java | 21 - .../db/wal/WalRecoveryTxLogicalRecordsTest.java | 103 ++-- .../platform/PlatformCacheWriteMetricsTask.java | 5 + .../junits/common/GridCommonAbstractTest.java | 38 ++ .../testsuites/IgniteCacheTestSuite2.java | 3 + .../testsuites/IgniteCacheTestSuite3.java | 2 + ...GridCacheLazyQueryPartitionsReleaseTest.java | 9 +- .../impl/cache/CacheBasedDatasetTest.java | 2 - 50 files changed, 2982 insertions(+), 1215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java ---------------------------------------------------------------------- diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java index 7a72ea6..6c58887 100644 --- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java +++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testframework/junits/IgniteCompatibilityNodeRunner.java @@ -109,6 +109,7 @@ public class IgniteCompatibilityNodeRunner extends IgniteNodeRunner { watchdog.interrupt(); } catch (Throwable e) { + e.printStackTrace(); X.println("Dumping classpath, error occurred: " + e); dumpClasspath(); throw e; http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 609f345..0b1cb87 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -536,6 +536,11 @@ public interface CacheMetrics { public long getRebalancingStartTime(); /** + * @return Number of partitions need to be cleared before actual rebalance start. + */ + public long getRebalanceClearingPartitionsLeft(); + + /** * Checks whether statistics collection is enabled in this cache. * <p> * The default value is {@code false}. http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index e29a316..0f952d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -85,6 +85,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.Update import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandLegacyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; @@ -511,6 +512,11 @@ public class GridIoMessageFactory implements MessageFactory { break; case 44: + msg = new GridDhtPartitionDemandLegacyMessage(); + + break; + + case 45: msg = new GridDhtPartitionDemandMessage(); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java index fb9c035..ce6416f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java @@ -405,6 +405,11 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public long getRebalanceClearingPartitionsLeft() { + return cache.clusterMetrics().getRebalanceClearingPartitionsLeft(); + } + + /** {@inheritDoc} */ @Override public boolean isValidForReading() { return cache.clusterMetrics().isValidForReading(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index fd9ea1f..12636f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; @@ -126,6 +127,9 @@ public class CacheGroupContext { /** */ private GridCachePreloader preldr; + /** Partition evictor. */ + private GridDhtPartitionsEvictor evictor; + /** */ private final DataRegion dataRegion; @@ -245,6 +249,13 @@ public class CacheGroupContext { } /** + * @return Partitions evictor. + */ + public GridDhtPartitionsEvictor evictor() { + return evictor; + } + + /** * @return IO policy for the given cache group. */ public byte ioPolicy() { @@ -881,6 +892,8 @@ public class CacheGroupContext { else preldr = new GridCachePreloaderAdapter(this); + evictor = new GridDhtPartitionsEvictor(this); + if (persistenceEnabled()) { try { offheapMgr = new GridCacheOffheapManager(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java index f62cd15..438c8c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java @@ -406,6 +406,11 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public long getRebalanceClearingPartitionsLeft() { + return cache.metrics0().getRebalanceClearingPartitionsLeft(); + } + + /** {@inheritDoc} */ @Override public boolean isValidForReading() { return cache.metrics0().isValidForReading(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 1603ecb..6fae8fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -123,6 +123,9 @@ public class CacheMetricsImpl implements CacheMetrics { /** Rebalancing rate in bytes. */ private HitRateMetrics rebalancingBytesRate = new HitRateMetrics(REBALANCE_RATE_INTERVAL, 20); + /** Number of currently clearing partitions for rebalancing. */ + private AtomicLong rebalanceClearingPartitions = new AtomicLong(); + /** Cache metrics. */ @GridToStringExclude private transient CacheMetricsImpl delegate; @@ -904,6 +907,19 @@ public class CacheMetricsImpl implements CacheMetrics { return rebalanceStartTime.get(); } + /** {@inheritDoc} */ + @Override public long getRebalanceClearingPartitionsLeft() { + return rebalanceClearingPartitions.get(); + } + + /** + * Sets clearing partitions number. + * @param partitions Partitions number. + */ + public void rebalanceClearingPartitions(int partitions) { + rebalanceClearingPartitions.set(partitions); + } + /** * First rebalance supply message callback. * @param keysCnt Estimated number of keys. http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index af6a174..e693720 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -209,6 +209,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** Estimate rebalance finish time. */ private long rebalanceFinishTime; + /** The number of clearing partitions need to await before rebalance. */ + private long rebalanceClearingPartitionsLeft; + /** */ private String keyType; @@ -329,6 +332,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { rebalancingKeysRate = m.getRebalancingKeysRate(); rebalanceStartTime = m.rebalancingStartTime(); rebalanceFinishTime = m.estimateRebalancingFinishTime(); + rebalanceClearingPartitionsLeft = m.getRebalanceClearingPartitionsLeft(); } /** @@ -760,6 +764,11 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { } /** {@inheritDoc} */ + @Override public long getRebalanceClearingPartitionsLeft() { + return rebalanceClearingPartitionsLeft; + } + + /** {@inheritDoc} */ @Override public boolean isWriteBehindEnabled() { return isWriteBehindEnabled; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 11a416f..3182192 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -815,12 +815,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; - case 45: { - processMessage(nodeId, msg, c);// Will be handled by Rebalance Demander. - } - - break; - case 49: { GridNearGetRequest req = (GridNearGetRequest)msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 1bd5591..4e93822 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandLegacyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; @@ -370,6 +371,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return; } + else if (m instanceof GridDhtPartitionDemandLegacyMessage) { + grp.preloader().handleDemandMessage(idx, id, + new GridDhtPartitionDemandMessage((GridDhtPartitionDemandLegacyMessage) m)); + + return; + } } U.error(log, "Unsupported message type: " + m.getClass().getName()); @@ -2240,7 +2247,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { long timeout = cctx.gridConfig().getNetworkTimeout(); - int cnt = 0; + long cnt = 0; while (!isCancelled()) { cnt++; @@ -2413,7 +2420,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // Don't delay for dummy reassigns to avoid infinite recursion. if ((delay == 0 || forcePreload) && !disableRebalance) - assigns = grp.preloader().assign(exchId, exchFut); + assigns = grp.preloader().generateAssignments(exchId, exchFut); assignsMap.put(grp.groupId(), assigns); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index dc1624a..5fa7a82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -23,7 +23,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -68,22 +67,22 @@ public interface GridCachePreloader { * @param exchFut Exchange future. * @return Assignments or {@code null} if detected that there are pending exchanges. */ - @Nullable public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId, - @Nullable GridDhtPartitionsExchangeFuture exchFut); + @Nullable public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId, + @Nullable GridDhtPartitionsExchangeFuture exchFut); /** * Adds assignments to preloader. * * @param assignments Assignments to add. * @param forcePreload Force preload flag. - * @param cnt Counter. + * @param rebalanceId Rebalance id. * @param next Runnable responsible for cache rebalancing start. * @param forcedRebFut Rebalance future. * @return Rebalancing runnable. */ public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - int cnt, + long rebalanceId, Runnable next, @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut); @@ -178,13 +177,6 @@ public interface GridCachePreloader { public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d); /** - * Evicts partition asynchronously. - * - * @param part Partition. - */ - public void evictPartitionAsync(GridDhtLocalPartition part); - - /** * @param lastFut Last future. */ public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut); http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index c0accf6..af91679 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -24,7 +24,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -153,26 +152,21 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId, + GridDhtPartitionsExchangeFuture exchFut) { return null; } /** {@inheritDoc} */ @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - int cnt, + long rebalanceId, Runnable next, @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut) { return null; } /** {@inheritDoc} */ - @Override public void evictPartitionAsync(GridDhtLocalPartition part) { - // No-op. - } - - /** {@inheritDoc} */ @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 546672c..3d83f87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -21,6 +21,7 @@ import java.util.Map; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.RowStore; @@ -234,13 +235,20 @@ public interface IgniteCacheOffheapManager { public GridIterator<CacheDataRow> partitionIterator(final int part) throws IgniteCheckedException; /** - * @param part Partition. + * @param part Partition number. * @param topVer Topology version. - * @param partCntr Partition counter to get historical data if available. + * @return Iterator for given partition that will reserve partition state until it is closed. + * @throws IgniteCheckedException If failed. + */ + public GridCloseableIterator<CacheDataRow> reservedIterator(final int part, final AffinityTopologyVersion topVer) + throws IgniteCheckedException; + + /** + * @param parts Partitions. * @return Partition data iterator. * @throws IgniteCheckedException If failed. */ - public IgniteRebalanceIterator rebalanceIterator(int part, AffinityTopologyVersion topVer, Long partCntr) + public IgniteRebalanceIterator rebalanceIterator(IgniteDhtDemandedPartitionsMap parts, AffinityTopologyVersion topVer) throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index f7892af..b201935 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -19,10 +19,12 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -36,6 +38,10 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; @@ -72,6 +78,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** * @@ -109,7 +116,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager private int updateValSizeThreshold; /** */ - private GridStripedLock partStoreLock = new GridStripedLock(Runtime.getRuntime().availableProcessors()); + protected GridStripedLock partStoreLock = new GridStripedLock(Runtime.getRuntime().availableProcessors()); /** {@inheritDoc} */ @Override public GridAtomicLong globalRemoveId() { @@ -813,54 +820,95 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public IgniteRebalanceIterator rebalanceIterator(int part, AffinityTopologyVersion topVer, Long partCntr) - throws IgniteCheckedException { - final GridIterator<CacheDataRow> it = partitionIterator(part); + @Override public GridCloseableIterator<CacheDataRow> reservedIterator(int part, + AffinityTopologyVersion topVer) throws IgniteCheckedException { + final GridDhtLocalPartition loc = grp.topology().localPartition(part, topVer, false); - return new IgniteRebalanceIterator() { - @Override public boolean historical() { - return false; - } + if (loc == null || !loc.reserve()) + return null; - @Override public boolean hasNextX() throws IgniteCheckedException { - return it.hasNextX(); - } + // It is necessary to check state after reservation to avoid race conditions. + if (loc.state() != OWNING) { + loc.release(); - @Override public CacheDataRow nextX() throws IgniteCheckedException { - return it.nextX(); - } + return null; + } - @Override public void removeX() throws IgniteCheckedException { - it.removeX(); - } + CacheDataStore data = partitionData(part); - @Override public Iterator<CacheDataRow> iterator() { - return it.iterator(); - } + final GridCursor<? extends CacheDataRow> cur = data.cursor(); - @Override public boolean hasNext() { - return it.hasNext(); - } + return new GridCloseableIteratorAdapter<CacheDataRow>() { + /** */ + private CacheDataRow next; - @Override public CacheDataRow next() { - return it.next(); - } + @Override protected CacheDataRow onNext() { + CacheDataRow res = next; - @Override public void close() { + next = null; + return res; } - @Override public boolean isClosed() { - return false; + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (next != null) + return true; + + if (cur.next()) + next = cur.get(); + + return next != null; } - @Override public void remove() { - throw new UnsupportedOperationException(); + @Override protected void onClose() throws IgniteCheckedException { + assert loc != null && loc.state() == OWNING && loc.reservations() > 0; + + loc.release(); } }; } /** {@inheritDoc} */ + @Override public IgniteRebalanceIterator rebalanceIterator(IgniteDhtDemandedPartitionsMap parts, + final AffinityTopologyVersion topVer) + throws IgniteCheckedException { + + final TreeMap<Integer, GridCloseableIterator<CacheDataRow>> iterators = new TreeMap<>(); + Set<Integer> missing = null; + + for (Integer p : parts.fullSet()) { + GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p, topVer); + + if (partIter == null) { + if (missing == null) + missing = new HashSet<>(); + + missing.add(p); + } + else + iterators.put(p, partIter); + } + + IgniteRebalanceIterator iter = new IgniteRebalanceIteratorImpl(iterators, historicalIterator(parts.historicalMap())); + + if (missing != null) { + for (Integer p : missing) + iter.setPartitionMissing(p); + } + + return iter; + } + + /** + * @param partCntrs Partition counters map. + * @return Historical iterator. + */ + @Nullable protected IgniteHistoricalIterator historicalIterator(CachePartitionPartialCountersMap partCntrs) + throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ @Override public final CacheDataStore createCacheDataStore(int p) throws IgniteCheckedException { CacheDataStore dataStore; http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java index 4efe640..d2f29d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteRebalanceIterator.java @@ -21,11 +21,30 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.util.lang.GridCloseableIterator; /** - * + * Iterator over supplied data for rebalancing. */ public interface IgniteRebalanceIterator extends GridCloseableIterator<CacheDataRow> { /** * @return {@code True} if this iterator is a historical iterator starting from the requested partition counter. */ - public boolean historical(); + public boolean historical(int partId); + + /** + * @param partId Partition ID. + * @return {@code True} if all data for given partition was already returned. + */ + public boolean isPartitionDone(int partId); + + /** + * @param partId Partition ID. + * @return {@code True} if partition was marked as missing. + */ + public boolean isPartitionMissing(int partId); + + /** + * Marks partition as missing. + * + * @param partId Partition ID. + */ + public void setPartitionMissing(int partId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 5c98ee8..9b3c1ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -1107,23 +1108,21 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { try { for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { GridDhtPartitionMap partMap = e.getValue(); + UUID remoteNodeId = e.getKey(); if (!partMap.containsKey(p)) continue; - if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) { - if (haveHistory) - partMap.put(p, MOVING); - else { - partMap.put(p, RENTING); + if (partMap.get(p) == OWNING && !owners.contains(remoteNodeId)) { + partMap.put(p, MOVING); - result.add(e.getKey()); - } + if (!haveHistory) + result.add(remoteNodeId); partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + - "[nodeId=" + e.getKey() + ", groupId=" + grpId + + "[nodeId=" + remoteNodeId + ", groupId=" + grpId + ", partId=" + p + ", haveHistory=" + haveHistory + "]"); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 89ce6dd..119ea91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; @@ -47,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -54,6 +56,7 @@ import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -114,6 +117,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements @GridToStringExclude private final GridFutureAdapter<?> rent; + /** Clear future. */ + @GridToStringExclude + private final ClearFuture clearFuture; + /** */ @GridToStringExclude private final GridCacheSharedContext ctx; @@ -148,14 +155,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** */ @GridToStringExclude - private final CacheDataStore store; + private volatile CacheDataStore store; /** Set if failed to move partition to RENTING state due to reservations, to be checked when * reservation is released. */ - private volatile boolean shouldBeRenting; + private volatile boolean delayedRenting; - /** Set if partition must be re-created and preloaded after eviction. */ - private boolean reload; + /** Set if partition must be cleared in MOVING state. */ + private volatile boolean clear; /** * @param ctx Context. @@ -189,6 +196,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } }; + clearFuture = new ClearFuture(); + int delQueueSize = grp.systemCache() ? 100 : Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20); @@ -321,13 +330,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * @return {@code True} if partition is marked for transfer to renting state. - */ - public boolean shouldBeRenting() { - return shouldBeRenting; - } - - /** * @return Reservations. */ public int reservations() { @@ -363,7 +365,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements removeEntry(entry); // Attempt to evict. - tryEvictAsync(false); + tryContinueClearing(); } /** @@ -434,7 +436,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * Reserves a partition so it won't be cleared. + * Reserves a partition so it won't be cleared or evicted. * * @return {@code True} if reserved. */ @@ -479,7 +481,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (reservations == 0) return; - assert getPartState(state) != EVICTED; + assert getPartState(state) != EVICTED : getPartState(state); long newState = setReservations(state, --reservations); newState = setSize(newState, getSize(newState) + sizeChange); @@ -488,10 +490,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements // Decrement reservations. if (this.state.compareAndSet(state, newState)) { - if (reservations == 0 && shouldBeRenting) - rent(true); - - tryEvictAsync(false); + // If no more reservations try to continue delayed renting or clearing process. + if (reservations == 0) { + if (delayedRenting) + rent(true); + else + tryContinueClearing(); + } break; } @@ -559,13 +564,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** * Forcibly moves partition to a MOVING state. */ - void moving() { + public void moving() { while (true) { long state = this.state.get(); GridDhtPartitionState partState = getPartState(state); - assert partState == OWNING : "Only OWNed partitions should be moved to MOVING state"; + assert partState == OWNING || partState == RENTING : "Only partitions in state OWNING or RENTING can be moved to MOVING state"; if (casState(state, MOVING)) { if (log.isDebugEnabled()) @@ -598,72 +603,98 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * @return {@code True} if partition should be re-created after it is cleared. - */ - public boolean reload() { - return reload; - } - - /** - * @param value {@code reload} flag value. - */ - public void reload(boolean value) { - reload = value; - } - - /** - * @param updateSeq Update sequence. + * Initiates partition eviction process. + * + * If partition has reservations, eviction will be delayed and continued after all reservations will be released. + * + * @param updateSeq If {@code true} topology update sequence will be updated after eviction is finished. * @return Future to signal that this node is no longer an owner or backup. */ public IgniteInternalFuture<?> rent(boolean updateSeq) { - long state = this.state.get(); + long state0 = this.state.get(); - GridDhtPartitionState partState = getPartState(state); + GridDhtPartitionState partState = getPartState(state0); if (partState == RENTING || partState == EVICTED) return rent; - shouldBeRenting = true; + delayedRenting = true; - if (getReservations(state) == 0 && casState(state, RENTING)) { - shouldBeRenting = false; + if (getReservations(state0) == 0 && casState(state0, RENTING)) { + delayedRenting = false; if (log.isDebugEnabled()) log.debug("Moved partition to RENTING state: " + this); // Evict asynchronously, as the 'rent' method may be called // from within write locks on local partition. - tryEvictAsync(updateSeq); + clearAsync0(updateSeq); } return rent; } /** + * Starts clearing process asynchronously if it's requested and not running at the moment. + * Method may finish clearing process ahead of time if partition is empty and doesn't have reservations. + * * @param updateSeq Update sequence. */ - public void tryEvictAsync(boolean updateSeq) { + private void clearAsync0(boolean updateSeq) { long state = this.state.get(); GridDhtPartitionState partState = getPartState(state); - if (isEmpty() && !grp.queriesEnabled() && getSize(state) == 0 && - partState == RENTING && getReservations(state) == 0 && !groupReserved() && - casState(state, EVICTED)) { - if (log.isDebugEnabled()) - log.debug("Evicted partition: " + this); + boolean evictionRequested = partState == RENTING || delayedRenting; + boolean clearingRequested = partState == MOVING && clear; - if (markForDestroy()) - finishDestroy(updateSeq); + if (!evictionRequested && !clearingRequested) + return; + + boolean reinitialized = clearFuture.initialize(updateSeq, evictionRequested); + + // Clearing process is already running at the moment. No needs to run it again. + if (!reinitialized) + return; + + // Try fast eviction + if (isEmpty() && getSize(state) == 0 && !grp.queriesEnabled() + && getReservations(state) == 0 && !groupReserved()) { + + if (partState == RENTING && casState(state, EVICTED) || clearingRequested) { + clearFuture.finish(); + + return; + } } - else if (partState == RENTING || shouldBeRenting()) - grp.preloader().evictPartitionAsync(this); + + grp.evictor().evictPartitionAsync(this); + } + + /** + * Initiates single clear process if partition is in MOVING state. + * Method does nothing if clear process is already running. + */ + public void clearAsync() { + if (state() != MOVING) + return; + + clear = true; + clearAsync0(false); + } + + /** + * Continues delayed clearing of partition if possible. + * Clearing may be delayed because of existing reservations. + */ + void tryContinueClearing() { + clearAsync0(true); } /** * @return {@code true} If there is a group reservation. */ - boolean groupReserved() { + private boolean groupReserved() { for (GridDhtPartitionsReservation reservation : reservations) { if (!reservation.invalidate()) return true; // Failed to invalidate reservation -> we are reserved. @@ -673,7 +704,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * @return {@code True} if evicting thread was added. + * @return {@code true} if evicting thread was added. */ private boolean addEvicting() { while (true) { @@ -682,15 +713,17 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (cnt != 0) return false; - if (evictGuard.compareAndSet(cnt, cnt + 1)) + if (evictGuard.compareAndSet(cnt, cnt + 1)) { + return true; + } } } /** - * + * @return {@code true} if no thread evicting partition at the moment. */ - private void clearEvicting() { + private boolean clearEvicting() { boolean free; while (true) { @@ -705,14 +738,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } } - if (free && state() == EVICTED) { - if (markForDestroy()) - finishDestroy(true); - } + return free; } /** - * @return {@code True} if partition is safe to destroy + * @return {@code True} if partition is safe to destroy. */ private boolean markForDestroy() { while (true) { @@ -727,7 +757,29 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * @param updateSeq Update sequence request. + * Moves partition state to {@code EVICTED} if possible. + * and initiates partition destroy process after successful moving partition state to {@code EVICTED} state. + * + * @param updateSeq If {@code true} increment update sequence on cache group topology after successful eviction. + */ + private void finishEviction(boolean updateSeq) { + long state0 = this.state.get(); + + GridDhtPartitionState state = getPartState(state0); + + if (state == EVICTED || (isEmpty() && getSize(state0) == 0 && getReservations(state0) == 0 && state == RENTING && casState(state0, EVICTED))) { + if (log.isDebugEnabled()) + log.debug("Evicted partition: " + this); + + if (markForDestroy()) + finishDestroy(updateSeq); + } + } + + /** + * Destroys partition data store and invokes appropriate callbacks. + * + * @param updateSeq If {@code true} increment update sequence on cache group topology after successful destroy. */ private void finishDestroy(boolean updateSeq) { assert state() == EVICTED : this; @@ -745,31 +797,72 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * Awaits completion of partition destroy process in case of {@code EVICTED} partition state. + */ + public void awaitDestroy() { + try { + if (state() == EVICTED) + rent.get(); + } catch (IgniteCheckedException e) { + log.error("Unable to await partition destroy " + this, e); + } + } + + /** + * Adds listener on {@link #clearFuture} finish. + * + * @param lsnr Listener. + */ + public void onClearFinished(IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) { + clearFuture.listen(lsnr); + } + + /** + * @return {@code True} if clearing process is running at the moment on the partition. + */ + public boolean isClearing() { + return !clearFuture.isDone(); + } + + /** + * Tries to start partition clear process {@link GridDhtLocalPartition#clearAll()}). + * Only one thread is allowed to do such process concurrently. + * At the end of clearing method completes {@code clearFuture}. + * + * @return {@code false} if clearing is not started due to existing reservations. * @throws NodeStoppingException If node is stopping. */ - public void tryEvict() throws NodeStoppingException { - long state = this.state.get(); + public boolean tryClear() throws NodeStoppingException { + if (clearFuture.isDone()) + return true; - GridDhtPartitionState partState = getPartState(state); + long state = this.state.get(); - if (partState != RENTING || getReservations(state) != 0 || groupReserved()) - return; + if (getReservations(state) != 0 || groupReserved()) + return false; if (addEvicting()) { try { // Attempt to evict partition entries from cache. - clearAll(); + long clearedEntities = clearAll(); - if (isEmpty() && getSize(state) == 0 && casState(state, EVICTED)) { - if (log.isDebugEnabled()) - log.debug("Evicted partition: " + this); - // finishDestroy() will be initiated by clearEvicting(). - } + if (log.isDebugEnabled()) + log.debug("Partition is cleared [clearedEntities=" + clearedEntities + ", part=" + this + "]"); + } + catch (NodeStoppingException e) { + clearFuture.finish(e); + + throw e; } finally { - clearEvicting(); + boolean free = clearEvicting(); + + if (free) + clearFuture.finish(); } } + + return true; } /** @@ -785,10 +878,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * + * On partition unlock callback. + * Tries to continue delayed partition clearing. */ void onUnlock() { - tryEvictAsync(false); + tryContinueClearing(); } /** @@ -854,11 +948,12 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * Clears values for this partition. + * Removes all entries and rows from this partition. * + * @return Number of rows cleared from page memory. * @throws NodeStoppingException If node stopping. */ - public void clearAll() throws NodeStoppingException { + private long clearAll() throws NodeStoppingException { GridCacheVersion clearVer = ctx.versions().next(); GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); @@ -872,6 +967,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements else clear(singleCacheEntryMap.map, extras, rec); + long cleared = 0; + if (!grp.allowFastEviction()) { CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; @@ -884,6 +981,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements try { CacheDataRow row = it0.next(); + // Do not clear fresh rows in case of single partition clearing. + if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear)) + continue; + if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) hld = cacheMapHolder(ctx.cacheContext(row.cacheId())); @@ -897,32 +998,27 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements true, false); - ctx.database().checkpointReadLock(); - - try { - if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { - removeEntry(cached); - - if (rec && !hld.cctx.config().isEventsDisabled()) { - hld.cctx.events().addEvent(cached.partition(), - cached.key(), - ctx.localNodeId(), - (IgniteUuid)null, - null, - EVT_CACHE_REBALANCE_OBJECT_UNLOADED, - null, - false, - cached.rawGet(), - cached.hasValue(), - null, - null, - null, - false); - } + if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { + removeEntry(cached); + + if (rec && !hld.cctx.config().isEventsDisabled()) { + hld.cctx.events().addEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + null, + null, + null, + false); } - } - finally { - ctx.database().checkpointReadUnlock(); + + cleared++; } } catch (GridDhtInvalidPartitionException e) { @@ -939,21 +1035,23 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (log.isDebugEnabled()) log.debug("Failed to get iterator for evicted partition: " + id); - rent.onDone(e); - throw e; } catch (IgniteCheckedException e) { U.error(log, "Failed to get iterator for evicted partition: " + id, e); } } + + return cleared; } /** + * Removes all cache entries from specified {@code map}. + * * @param map Map to clear. * @param extras Obsolete extras. * @param evt Unload event flag. - * @throws NodeStoppingException + * @throws NodeStoppingException If current node is stopping. */ private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map, GridCacheObsoleteEntryExtras extras, @@ -995,8 +1093,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (log.isDebugEnabled()) log.debug("Failed to clear cache entry for evicted partition: " + cached.partition()); - rent.onDone(e); - throw e; } catch (IgniteCheckedException e) { @@ -1009,7 +1105,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * + * Removes all deferred delete requests from {@code rmvQueue}. */ private void clearDeferredDeletes() { for (RemovedEntryHolder e : rmvQueue) @@ -1098,12 +1194,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements void onCacheStopped(int cacheId) { assert grp.sharedGroup() : grp.cacheOrGroupName(); - for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); it.hasNext();) { - RemovedEntryHolder e = it.next(); - - if (e.cacheId() == cacheId) - it.remove(); - } + rmvQueue.removeIf(e -> e.cacheId() == cacheId); cacheMaps.remove(cacheId); } @@ -1222,4 +1313,168 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements return S.toString(RemovedEntryHolder.class, this); } } + + /** + * Future is needed to control partition clearing process. + * Future can be used both for single clearing or eviction processes. + */ + class ClearFuture extends GridFutureAdapter<Boolean> { + /** Flag indicates that eviction callback was registered on the current future. */ + private volatile boolean evictionCallbackRegistered; + + /** Flag indicates that clearing callback was registered on the current future. */ + private volatile boolean clearingCallbackRegistered; + + /** Flag indicates that future with all callbacks was finished. */ + private volatile boolean finished; + + /** + * Constructor. + */ + ClearFuture() { + onDone(); + finished = true; + } + + /** + * Registers finish eviction callback on the future. + * + * @param updateSeq If {@code true} update topology sequence after successful eviction. + */ + private void registerEvictionCallback(boolean updateSeq) { + if (evictionCallbackRegistered) + return; + + synchronized (this) { + // Double check + if (evictionCallbackRegistered) + return; + + evictionCallbackRegistered = true; + + // Initiates partition eviction and destroy. + listen(f -> { + if (f.error() != null) { + rent.onDone(f.error()); + } else if (f.isDone()) { + finishEviction(updateSeq); + } + + evictionCallbackRegistered = false; + }); + } + } + + /** + * Registers clearing callback on the future. + */ + private void registerClearingCallback() { + if (clearingCallbackRegistered) + return; + + synchronized (this) { + // Double check + if (clearingCallbackRegistered) + return; + + clearingCallbackRegistered = true; + + // Recreate cache data store in case of allowed fast eviction, and reset clear flag. + listen(f -> { + clear = false; + + clearingCallbackRegistered = false; + }); + } + } + + /** + * Recreate cache data store after successful clearing and allowed fast eviction. + */ + private void recreateCacheDataStore() { + assert grp.offheap() instanceof GridCacheOffheapManager; + + try { + CacheDataStore store0 = store; + + store = ((GridCacheOffheapManager) grp.offheap()).recreateCacheDataStore(store0); + + // Inject row cache cleaner on store creation + // Used in case the cache with enabled SqlOnheapCache is single cache at the cache group + if (ctx.kernalContext().query().moduleEnabled()) { + GridQueryRowCacheCleaner cleaner = ctx.kernalContext().query().getIndexing() + .rowCacheCleaner(grp.groupId()); + + if (store != null && cleaner != null) + store.setRowCacheCleaner(cleaner); + } + } catch (IgniteCheckedException e) { + finish(e); + } + } + + /** + * Successfully finishes the future. + */ + public void finish() { + if (state() == MOVING && clear && grp.allowFastEviction()) + recreateCacheDataStore(); + + synchronized (this) { + onDone(); + finished = true; + } + } + + /** + * Finishes the future with error. + * + * @param t Error. + */ + public void finish(Throwable t) { + synchronized (this) { + onDone(t); + finished = true; + } + } + + /** + * Reuses future if it's done. + * Adds appropriate callbacks to the future in case of eviction or single clearing. + * + * @param updateSeq Update sequence. + * @param evictionRequested If {@code true} adds eviction callback, in other case adds single clearing callback. + * @return {@code true} if future has been reinitialized. + */ + public boolean initialize(boolean updateSeq, boolean evictionRequested) { + // In case of running clearing just try to add missing callbacks to avoid extra synchronization. + if (!finished) { + if (evictionRequested) + registerEvictionCallback(updateSeq); + else + registerClearingCallback(); + + return false; + } + + synchronized (this) { + boolean done = isDone(); + + if (done) { + reset(); + + finished = false; + evictionCallbackRegistered = false; + clearingCallbackRegistered = false; + } + + if (evictionRequested) + registerEvictionCallback(updateSeq); + else + registerClearingCallback(); + + return done; + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 1234042..528f0a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -37,6 +38,7 @@ import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; @@ -89,9 +91,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private static final boolean FAST_DIFF_REBUILD = false; /** */ - private static final Long ZERO = 0L; - - /** */ private final GridCacheSharedContext ctx; /** */ @@ -760,7 +759,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { changed = true; if (log.isDebugEnabled()) { - log.debug("Evicting moving partition (it does not belong to affinity) [" + + log.debug("Evicting " + state + " partition (it does not belong to affinity) [" + "grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']'); } } @@ -813,14 +812,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtLocalPartition loc = locParts.get(p); if (loc == null || loc.state() == EVICTED) { - if (loc != null) { - try { - loc.rent(false).get(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } + // Make sure that after eviction partition is destroyed. + if (loc != null) + loc.awaitDestroy(); locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); @@ -906,12 +900,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean belongs = partitionLocalNode(p, topVer); if (loc != null && state == EVICTED) { - try { - loc.rent(false).get(); - } - catch (IgniteCheckedException ex) { - throw new IgniteException(ex); - } + // Make sure that after eviction partition is destroyed. + loc.awaitDestroy(); locParts.set(p, loc = null); @@ -924,8 +914,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } else if (loc != null && state == RENTING && !showRenting) { throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " + - "evicted [grp=" + grp.cacheOrGroupName() + ", part=" + p + ", shouldBeMoving=" + - loc.reload() + ", belongs=" + belongs + ", topVer=" + topVer + ", curTopVer=" + this.readyTopVer + "]"); + "evicted [grp=" + grp.cacheOrGroupName() + ", part=" + p + ", shouldBeMoving=" + + ", belongs=" + belongs + ", topVer=" + topVer + ", curTopVer=" + this.readyTopVer + "]"); } if (loc == null) { @@ -937,8 +927,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); - if (updateSeq) - this.updateSeq.incrementAndGet(); + this.updateSeq.incrementAndGet(); created = true; @@ -1483,32 +1472,49 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { else if (state == MOVING) { GridDhtLocalPartition locPart = locParts.get(p); - if (locPart == null || locPart.state() == EVICTED) - locPart = createPartition(p); + if (!partsToReload.contains(p)) { + if (locPart == null || locPart.state() == EVICTED) + locPart = createPartition(p); - if (locPart.state() == OWNING) { - locPart.moving(); + if (locPart.state() == OWNING) { + locPart.moving(); - changed = true; + changed = true; + } } - } - else if (state == RENTING && partsToReload.contains(p)) { - GridDhtLocalPartition locPart = locParts.get(p); + else { + if (locPart == null || locPart.state() == EVICTED) { + createPartition(p); - if (locPart == null || locPart.state() == EVICTED) { - createPartition(p); + changed = true; + } + else if (locPart.state() == OWNING || locPart.state() == MOVING) { + if (locPart.state() == OWNING) + locPart.moving(); + locPart.clearAsync(); - changed = true; - } - else if (locPart.state() == OWNING || locPart.state() == MOVING) { - locPart.reload(true); + changed = true; + } + else if (locPart.state() == RENTING) { + // Try to prevent partition eviction. + if (locPart.reserve()) { + try { + locPart.moving(); + locPart.clearAsync(); + } finally { + locPart.release(); + } + } + // In other case just recreate it. + else { + assert locPart.state() == EVICTED; - locPart.rent(false); + createPartition(p); + } - changed = true; + changed = true; + } } - else - locPart.reload(true); } } } @@ -1535,8 +1541,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ctx.exchange().scheduleResendPartitions(); return changed; - } - finally { + } finally { lock.writeLock().unlock(); } } @@ -1855,6 +1860,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * Rebuilds {@link #diffFromAffinity} from given assignment. + * * @param affAssignment New affinity assignment. */ private void rebuildDiff(AffinityAssignment affAssignment) { @@ -2078,37 +2085,31 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (locPart != null) { if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) { - if (haveHistory) - locPart.moving(); - else { - locPart.rent(false); - - locPart.reload(true); + locPart.moving(); + if (!haveHistory) { + locPart.clearAsync(); result.add(ctx.localNodeId()); } U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + "[nodeId=" + ctx.localNodeId() + ", grp=" + grp.cacheOrGroupName() + ", partId=" + locPart.id() + ", haveHistory=" + haveHistory + "]"); - } } for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + UUID remoteNodeId = e.getKey(); GridDhtPartitionMap partMap = e.getValue(); if (!partMap.containsKey(p)) continue; - if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) { - if (haveHistory) - partMap.put(p, MOVING); - else { - partMap.put(p, RENTING); + if (partMap.get(p) == OWNING && !owners.contains(remoteNodeId)) { + partMap.put(p, MOVING); - result.add(e.getKey()); - } + if (!haveHistory) + result.add(remoteNodeId); partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); @@ -2116,15 +2117,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { this.updateSeq.setIfGreater(partMap.updateSequence()); U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + - "[nodeId=" + e.getKey() + ", grp=" + grp.cacheOrGroupName() + + "[nodeId=" + remoteNodeId + ", grp=" + grp.cacheOrGroupName() + ", partId=" + p + ", haveHistory=" + haveHistory + "]"); } } - if (updateSeq) - node2part = new GridDhtPartitionFullMap(node2part, this.updateSeq.incrementAndGet()); - } - finally { + if (updateSeq) { + long updSeq = this.updateSeq.incrementAndGet(); + + node2part = new GridDhtPartitionFullMap(node2part, updSeq); + } + } finally { lock.writeLock().unlock(); } } @@ -2136,9 +2139,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * Finds local partitions which don't belong to affinity and runs eviction process for such partitions. + * * @param updateSeq Update sequence. * @param aff Affinity assignments. - * @return Checks if any of the local partitions need to be evicted. + * @return {@code True} if there are local partitions need to be evicted. */ private boolean checkEvictions(long updateSeq, AffinityAssignment aff) { if (!ctx.kernalContext().state().evictionsAllowed()) @@ -2148,6 +2153,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { UUID locId = ctx.localNodeId(); + List<IgniteInternalFuture<?>> rentingFutures = new ArrayList<>(); + for (int p = 0; p < locParts.length(); p++) { GridDhtLocalPartition part = locParts.get(p); @@ -2165,9 +2172,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // If all affinity nodes are owners, then evict partition from local node. if (nodeIds.containsAll(F.nodeIds(affNodes))) { - part.reload(false); - - part.rent(false); + IgniteInternalFuture<?> rentFuture = part.rent(false); + rentingFutures.add(rentFuture); updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion()); @@ -2192,14 +2198,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ClusterNode n = nodes.get(i); if (locId.equals(n.id())) { - part.reload(false); - - part.rent(false); + IgniteInternalFuture<?> rentFuture = part.rent(false); + rentingFutures.add(rentFuture); - updateSeq = updateLocal(part.id(), - part.state(), - updateSeq, - aff.topologyVersion()); + updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion()); changed = true; @@ -2217,14 +2219,38 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + // After all rents are finished resend partitions. + if (!rentingFutures.isEmpty()) { + final AtomicInteger rentingPartitions = new AtomicInteger(rentingFutures.size()); + + for (IgniteInternalFuture<?> rentingFuture : rentingFutures) { + rentingFuture.listen(f -> { + int remaining = rentingPartitions.decrementAndGet(); + + if (remaining == 0) { + lock.writeLock().lock(); + + try { + this.updateSeq.incrementAndGet(); + + ctx.exchange().scheduleResendPartitions(); + } + finally { + lock.writeLock().unlock(); + } + } + }); + } + } + return changed; } /** - * Updates value for single partition. + * Updates state of partition in local {@link #node2part} map and recalculates {@link #diffFromAffinity}. * * @param p Partition. - * @param state State. + * @param state Partition state. * @param updateSeq Update sequence. * @param affVer Affinity version. * @return Update sequence. @@ -2302,6 +2328,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * Removes node from local {@link #node2part} map and recalculates {@link #diffFromAffinity}. + * * @param nodeId Node to remove. */ private void removeNode(UUID nodeId) { @@ -2346,7 +2374,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part.own()) { assert lastTopChangeVer.initialized() : lastTopChangeVer; - updateLocal(part.id(), part.state(), updateSeq.incrementAndGet(), lastTopChangeVer); + long updSeq = updateSeq.incrementAndGet(); + + updateLocal(part.id(), part.state(), updSeq, lastTopChangeVer); consistencyCheck(); @@ -2377,9 +2407,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); - if (part.reload()) - part = createPartition(part.id()); - assert lastTopChangeVer.initialized() : lastTopChangeVer; updateLocal(part.id(), part.state(), seq, lastTopChangeVer); @@ -2590,11 +2617,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** - * @param p Partition. + * Checks that state of partition {@code p} for node {@code nodeId} in local {@code node2part} map + * matches to one of the specified states {@code match} or {@ode matches}. + * + * @param p Partition id. * @param nodeId Node ID. * @param match State to match. * @param matches Additional states. - * @return Filter for owners of this partition. + * @return {@code True} if partition matches to one of the specified states. */ private boolean hasState(final int p, @Nullable UUID nodeId, final GridDhtPartitionState match, final GridDhtPartitionState... matches) {