IGNITE-3183 ScanQuery and localEntries are ignored keepBinary flag in OFFHEAP_TIERED mode
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1da14369 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1da14369 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1da14369 Branch: refs/heads/ignite-3212 Commit: 1da14369c886ed12dbe0fc9b2c4d5a568c76b6b2 Parents: f175d3c Author: Anton Vinogradov <a...@apache.org> Authored: Thu Jun 2 17:02:09 2016 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Thu Jun 2 17:02:09 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 43 +- .../processors/cache/GridCacheContext.java | 7 +- .../processors/cache/GridCacheEntryEx.java | 3 +- .../cache/GridCacheEvictionManager.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 4 +- .../processors/cache/GridCacheSwapManager.java | 67 ++- .../distributed/dht/GridDhtCacheAdapter.java | 11 +- .../cache/query/GridCacheQueryManager.java | 9 +- .../processors/cache/GridCacheTestEntryEx.java | 2 +- .../cache/IgniteCachePeekModesAbstractTest.java | 8 +- ...CacheKeepBinaryIterationNearEnabledTest.java | 44 ++ ...acheKeepBinaryIterationStoreEnabledTest.java | 90 ++++ ...CacheKeepBinaryIterationSwapEnabledTest.java | 56 +++ .../CacheKeepBinaryIterationTest.java | 471 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite3.java | 8 + 15 files changed, 762 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3a72ba2..0b3b2da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -707,12 +707,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Collection<Iterator<Cache.Entry<K, V>>> its = new ArrayList<>(); + final boolean keepBinary = ctx.keepBinary(); + if (ctx.isLocal()) { modes.primary = true; modes.backup = true; if (modes.heap) - its.add(iterator(map.entries().iterator(), !ctx.keepBinary())); + its.add(iterator(map.entries().iterator(), !keepBinary)); } else if (modes.heap) { if (modes.near && ctx.isNear()) @@ -721,7 +723,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (modes.primary || modes.backup) { GridDhtCacheAdapter<K, V> cache = ctx.isNear() ? ctx.near().dht() : ctx.dht(); - its.add(cache.localEntriesIterator(modes.primary, modes.backup)); + its.add(cache.localEntriesIterator(modes.primary, modes.backup, keepBinary)); } } @@ -732,10 +734,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); if (modes.swap) - its.add(swapMgr.<K, V>swapIterator(modes.primary, modes.backup, topVer)); + its.add(swapMgr.<K, V>swapIterator(modes.primary, modes.backup, topVer, keepBinary)); if (modes.offheap) - its.add(swapMgr.<K, V>offheapIterator(modes.primary, modes.backup, topVer)); + its.add(swapMgr.<K, V>offheapIterator(modes.primary, modes.backup, topVer, keepBinary)); } final Iterator<Cache.Entry<K, V>> it = F.flatIterators(its); @@ -1002,7 +1004,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> entrySetx(final CacheEntryPredicate... filter) { - return new EntrySet(map.entrySet(filter)); + boolean keepBinary = ctx.keepBinary(); + + return new EntrySet(map.entrySet(filter), keepBinary); } /** {@inheritDoc} */ @@ -3806,7 +3810,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Iterator<Cache.Entry<K, V>> it; try { - it = ctx.swap().offheapIterator(true, true, ctx.affinity().affinityTopologyVersion()); + it = ctx.swap().offheapIterator(true, true, ctx.affinity().affinityTopologyVersion(), ctx.keepBinary()); } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -3817,7 +3821,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V else entry = map.randomEntry(); - return entry == null || entry.obsolete() ? null : entry.<K, V>wrapLazyValue(); + return entry == null || entry.obsolete() ? null : entry.<K, V>wrapLazyValue(ctx.keepBinary()); } /** {@inheritDoc} */ @@ -4672,13 +4676,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * @return Primary entry set. - */ - public Set<Cache.Entry<K, V>> primaryEntrySet() { - return new EntrySet(map.entrySet(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()))); - } - - /** * @param key Key. * @param deserializeBinary Deserialize binary flag. * @param needVer Need version. @@ -6689,12 +6686,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Current entry. */ private GridCacheMapEntry current; + /** Keep binary flag. */ + private final boolean keepBinary; + /** * Constructor. * @param internalIterator Internal iterator. */ - private EntryIterator(Iterator<GridCacheMapEntry> internalIterator) { + private EntryIterator(Iterator<GridCacheMapEntry> internalIterator, boolean keepBinary) { this.internalIterator = internalIterator; + this.keepBinary = keepBinary; } /** {@inheritDoc} */ @@ -6706,7 +6707,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Override public Cache.Entry<K, V> next() { current = internalIterator.next(); - return current.wrapLazyValue(); + return current.wrapLazyValue(keepBinary); } /** {@inheritDoc} */ @@ -6715,7 +6716,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throw new IllegalStateException(); try { - GridCacheAdapter.this.getAndRemove((K)current.wrapLazyValue().getKey()); + GridCacheAdapter.this.getAndRemove((K)current.wrapLazyValue(keepBinary).getKey()); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -6733,14 +6734,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Internal set. */ private final Set<GridCacheMapEntry> internalSet; + /** Keep binary flag. */ + private final boolean keepBinary; + /** Constructor. */ - private EntrySet(Set<GridCacheMapEntry> internalSet) { + private EntrySet(Set<GridCacheMapEntry> internalSet, boolean keepBinary) { this.internalSet = internalSet; + this.keepBinary = keepBinary; } /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { - return new EntryIterator(internalSet.iterator()); + return new EntryIterator(internalSet.iterator(), keepBinary); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 7ad6c77..36d9104 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1155,7 +1155,7 @@ public class GridCacheContext<K, V> implements Externalizable { GridCacheEntryEx e, @Nullable IgnitePredicate<Cache.Entry<K1, V1>>[] p ) throws IgniteCheckedException { - return F.isEmpty(p) || isAll(e.<K1, V1>wrapLazyValue(), p); + return F.isEmpty(p) || isAll(e.<K1, V1>wrapLazyValue(keepBinary()), p); } /** @@ -1687,14 +1687,15 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if OFFHEAP_TIERED memory mode is enabled. */ public boolean offheapTiered() { - return cacheCfg.getMemoryMode() == OFFHEAP_TIERED && isOffHeapEnabled(); + return cacheCfg != null && cacheCfg.getMemoryMode() == OFFHEAP_TIERED && isOffHeapEnabled(); } /** * @return {@code True} if should use entry with offheap value pointer. */ public boolean useOffheapEntry() { - return cacheCfg.getMemoryMode() == OFFHEAP_TIERED || cacheCfg.getMemoryMode() == OFFHEAP_VALUES; + return cacheCfg != null && + (cacheCfg.getMemoryMode() == OFFHEAP_TIERED || cacheCfg.getMemoryMode() == OFFHEAP_VALUES); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index da9108c..646e6bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -146,10 +146,11 @@ public interface GridCacheEntryEx { /** * Wraps entry to an entry with lazy value get. + * @param keepBinary Keep binary flag. * * @return Entry. */ - public <K, V> Cache.Entry<K, V> wrapLazyValue(); + public <K, V> Cache.Entry<K, V> wrapLazyValue(boolean keepBinary); /** * Peeks value provided to public API entries and to entry filters. http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index c102c58..134e743 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -1389,7 +1389,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { if (log.isDebugEnabled()) log.debug("Notifying eviction policy with entry: " + e); - if (filter == null || filter.evictAllowed(e.wrapLazyValue())) + if (filter == null || filter.evictAllowed(e.wrapLazyValue(cctx.keepBinary()))) plc.onEntryAccessed(e.obsoleteOrDeleted(), e.wrapEviction()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index ee615b8..12bd556 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -4096,10 +4096,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public <K, V> Cache.Entry<K, V> wrapLazyValue() { + @Override public <K, V> Cache.Entry<K, V> wrapLazyValue(boolean keepBinary) { CacheOperationContext opCtx = cctx.operationContextPerCall(); - return new LazyValueEntry<>(key, opCtx != null && opCtx.isKeepBinary()); + return new LazyValueEntry<>(key, keepBinary); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 30159fa..cc3261c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1607,14 +1607,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param keepBinary Keep binary flag. * @return Lazy swap iterator. * @throws IgniteCheckedException If failed. */ - public <K, V> Iterator<Map.Entry<K, V>> lazySwapIterator() throws IgniteCheckedException { + public <K, V> Iterator<Map.Entry<K, V>> lazySwapIterator(boolean keepBinary) throws IgniteCheckedException { if (!swapEnabled) return new GridEmptyIterator<>(); - return lazyIterator(cctx.gridSwap().rawIterator(spaceName)); + return lazyIterator(cctx.gridSwap().rawIterator(spaceName), keepBinary); } /** @@ -1667,13 +1668,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param keepBinary Keep binary flag. * @return Lazy off-heap iterator. */ - public <K, V> Iterator<Map.Entry<K, V>> lazyOffHeapIterator() { + public <K, V> Iterator<Map.Entry<K, V>> lazyOffHeapIterator(boolean keepBinary) { if (!offheapEnabled) return new GridEmptyCloseableIterator<>(); - return lazyIterator(offheap.iterator(spaceName)); + return lazyIterator(offheap.iterator(spaceName), keepBinary); } /** @@ -1698,10 +1700,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * Gets lazy iterator for which key and value are lazily deserialized. * * @param it Closeable iterator. + * @param keepBinary Keep binary. * @return Lazy iterator. */ private <K, V> Iterator<Map.Entry<K, V>> lazyIterator( - final GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> it) { + final GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> it, final boolean keepBinary) { if (it == null) return new GridEmptyIterator<>(); @@ -1714,7 +1717,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected Map.Entry<K, V> onNext() { final Map.Entry<byte[], byte[]> cur0 = it.next(); - cur = new GridVersionedMapEntry<K, V>(cur0); + cur = new GridVersionedMapEntry<K, V>(cur0, keepBinary); return cur; } @@ -2043,11 +2046,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. * @param topVer Topology version. + * @param keepBinary Keep binary flag. * @return Swap entries iterator. * @throws IgniteCheckedException If failed. */ - public <K, V> Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer) - throws IgniteCheckedException + public <K, V> Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, + boolean backup, + AffinityTopologyVersion topVer, + boolean keepBinary) throws IgniteCheckedException { assert primary || backup; @@ -2055,12 +2061,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return new GridEmptyIterator<>(); if (primary && backup) - return cacheEntryIterator(this.<K, V>lazySwapIterator()); + return cacheEntryIterator(this.<K, V>lazySwapIterator(keepBinary)); Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) : cctx.affinity().backupPartitions(cctx.localNodeId(), topVer); - return new PartitionsIterator<K, V>(parts) { + return new PartitionsIterator<K, V>(parts, keepBinary) { @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> nextPartition(int part) throws IgniteCheckedException { @@ -2073,12 +2079,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. * @param topVer Topology version. + * @param keepBinary Keep binary flag. * @return Offheap entries iterator. * @throws IgniteCheckedException If failed. */ public <K, V> Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, - AffinityTopologyVersion topVer) + AffinityTopologyVersion topVer, + boolean keepBinary) throws IgniteCheckedException { assert primary || backup; @@ -2087,12 +2095,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return new GridEmptyIterator<>(); if (primary && backup) - return cacheEntryIterator(this.<K, V>lazyOffHeapIterator()); + return cacheEntryIterator(this.<K, V>lazyOffHeapIterator(keepBinary)); Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) : cctx.affinity().backupPartitions(cctx.localNodeId(), topVer); - return new PartitionsIterator<K, V>(parts) { + return new PartitionsIterator<K, V>(parts, keepBinary) { @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> nextPartition(int part) { return offheap.iterator(spaceName, part); } @@ -2314,17 +2322,25 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * */ private abstract class PartitionsIterator<K, V> extends PartitionsAbstractIterator<Cache.Entry<K, V>> { + /** */ + private final boolean keepBinary; + /** * @param parts Partitions + * @param keepBinary Keep binary flag. */ - public PartitionsIterator(Collection<Integer> parts) { + public PartitionsIterator(Collection<Integer> parts, boolean keepBinary) { super(parts); + + this.keepBinary = keepBinary; + + advance(); } /** {@inheritDoc} */ @Override protected Iterator<Cache.Entry<K, V>> partitionIterator(int part) throws IgniteCheckedException { - return cacheEntryIterator(GridCacheSwapManager.this.<K, V>lazyIterator(nextPartition(part))); + return cacheEntryIterator(GridCacheSwapManager.this.<K, V>lazyIterator(nextPartition(part), keepBinary)); } /** @@ -2350,12 +2366,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { private T next; /** - * @param parts Partitions + * @param parts Partitions. */ public PartitionsAbstractIterator(Collection<Integer> parts) { this.partIt = parts.iterator(); - - advance(); } /** {@inheritDoc} */ @@ -2383,7 +2397,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * Switches to next element. */ - private void advance() { + protected final void advance() { next = null; do { @@ -2527,23 +2541,26 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { */ private class GridVersionedMapEntry<K,V> implements Map.Entry<K,V>, GridCacheVersionAware { /** */ - private Map.Entry<byte[], byte[]> entry; + final private Map.Entry<byte[], byte[]> entry; + + /** */ + final private boolean keepBinary; /** * Constructor. * * @param entry Entry. + * @param keepBinary Keep binary. */ - public GridVersionedMapEntry(Map.Entry<byte[], byte[]> entry) { + public GridVersionedMapEntry(Map.Entry<byte[], byte[]> entry, boolean keepBinary) { this.entry = entry; + this.keepBinary = keepBinary; } /** {@inheritDoc} */ @Override public K getKey() { try { - KeyCacheObject key = cctx.toCacheKeyObject(entry.getKey()); - - return key.value(cctx.cacheObjectContext(), false); + return (K)cctx.unwrapBinaryIfNeeded(cctx.toCacheKeyObject(entry.getKey()), keepBinary); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -2557,7 +2574,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { assert e != null; - return e.value().value(cctx.cacheObjectContext(), false); + return (V)cctx.unwrapBinaryIfNeeded(e.value(), keepBinary); } catch (IgniteCheckedException ex) { throw new IgniteException(ex); http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 5688b61..2ab6303 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -1217,13 +1217,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. + * @param keepBinary Keep binary flag. * @return Local entries iterator. */ - public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, final boolean backup) { + public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, + final boolean backup, + final boolean keepBinary) { assert primary || backup; if (primary && backup) - return iterator(entries().iterator(), !ctx.keepBinary()); + return iterator(entries().iterator(), !keepBinary); else { final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); @@ -1287,7 +1290,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } }; - return iterator(it, !ctx.keepBinary()); + return iterator(it, !keepBinary); } } @@ -1352,7 +1355,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (next instanceof GridCacheMapEntry && (!((GridCacheMapEntry)next).visitable(CU.empty0()))) continue; - entry = next.wrapLazyValue(); + entry = next.wrapLazyValue(ctx.keepBinary()); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index be2a85c..6729d41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -977,7 +977,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (locNode && plc == null && !cctx.isLocal()) { GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht(); - final Iterator<Cache.Entry<K, V>> iter = cache.localEntriesIterator(true, backups); + final Iterator<Cache.Entry<K, V>> iter = cache.localEntriesIterator(true, + backups, + cache.context().keepBinary()); return new GridIteratorAdapter<IgniteBiTuple<K, V>>() { /** */ @@ -1157,8 +1159,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte K key = e.key(); V val = e.value(); + key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary); + if (filter != null || locNode) { - key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary); val = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary); } @@ -2673,7 +2676,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (key != null) return key; - key = cctx.toCacheKeyObject(keyBytes()).value(cctx.cacheObjectContext(), false); + key = (K)cctx.toCacheKeyObject(keyBytes()); return key; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index a1153cd..348b6c9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -349,7 +349,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** @inheritDoc */ - @Override public Cache.Entry wrapLazyValue() { + @Override public Cache.Entry wrapLazyValue(boolean keepBinary) { assert false; return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java index 056affc..c27cccb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java @@ -292,9 +292,9 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra Iterator<Map.Entry<Integer, String>> offheapIt; if (internalCache.context().isNear()) - offheapIt = internalCache.context().near().dht().context().swap().lazyOffHeapIterator(); + offheapIt = internalCache.context().near().dht().context().swap().lazyOffHeapIterator(false); else - offheapIt = internalCache.context().swap().lazyOffHeapIterator(); + offheapIt = internalCache.context().swap().lazyOffHeapIterator(false); while (offheapIt.hasNext()) { Map.Entry<Integer, String> e = offheapIt.next(); @@ -704,9 +704,9 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra Iterator<Map.Entry<Integer, String>> offheapIt; if (internalCache.context().isNear()) - offheapIt = internalCache.context().near().dht().context().swap().lazyOffHeapIterator(); + offheapIt = internalCache.context().near().dht().context().swap().lazyOffHeapIterator(false); else - offheapIt = internalCache.context().swap().lazyOffHeapIterator(); + offheapIt = internalCache.context().swap().lazyOffHeapIterator(false); Affinity aff = ignite(nodeIdx).affinity(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationNearEnabledTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationNearEnabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationNearEnabledTest.java new file mode 100644 index 0000000..02d72b8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationNearEnabledTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; + +/** + * + */ +public class CacheKeepBinaryIterationNearEnabledTest extends CacheKeepBinaryIterationTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration<Object, Object> ccfg = + super.cacheConfiguration(cacheMode, backups, atomicityMode, memoryMode); + + ccfg.setNearConfiguration(new NearCacheConfiguration<>()); + + return ccfg; + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationStoreEnabledTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationStoreEnabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationStoreEnabledTest.java new file mode 100644 index 0000000..f345611 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationStoreEnabledTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; + +/** + * + */ +public class CacheKeepBinaryIterationStoreEnabledTest extends CacheKeepBinaryIterationTest { + /** Cache store. */ + private static TestStore store = new TestStore(); + + /** {@inheritDoc} */ + @Override protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration<Object, Object> ccfg = + super.cacheConfiguration(cacheMode, backups, atomicityMode, memoryMode); + + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + ccfg.setCacheStoreFactory(singletonFactory(store)); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setLoadPreviousValue(true); + + return ccfg; + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter<Object, Object> { + /** Map. */ + private ConcurrentMap<Object, Object> map = new ConcurrentHashMap<>(); + + /** + * @param key Key. + * @return Value. + */ + Object value(Object key) { + return map.get(key); + } + + /** @return {@code True} if empty. */ + boolean isEmpty() { + return map.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(javax.cache.Cache.Entry<? extends Object, ? extends Object> e) { + map.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + map.remove(key); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationSwapEnabledTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationSwapEnabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationSwapEnabledTest.java new file mode 100644 index 0000000..fe619c4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationSwapEnabledTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi; + +/** + * + */ +public class CacheKeepBinaryIterationSwapEnabledTest extends CacheKeepBinaryIterationTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration<Object, Object> ccfg = + super.cacheConfiguration(cacheMode, backups, atomicityMode, memoryMode); + + ccfg.setSwapEnabled(true); + ccfg.setOffHeapMaxMemory(1000); + ccfg.setEvictionPolicy(new LruEvictionPolicy(10)); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java new file mode 100644 index 0000000..a775d21 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.config.GridTestProperties; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheKeepBinaryIterationTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 3; + + /** */ + private static final int KEYS = 1025; + + static { + GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName()); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(getServerNodeCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnHeap() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED + ); + + doTestScanQuery(ccfg, true, true); + doTestScanQuery(ccfg, true, false); + doTestScanQuery(ccfg, false, true); + doTestScanQuery(ccfg, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffHeap() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_TIERED + ); + + doTestScanQuery(ccfg, true, true); + doTestScanQuery(ccfg, true, false); + doTestScanQuery(ccfg, false, true); + doTestScanQuery(ccfg, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnHeap() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + doTestScanQuery(ccfg, true, true); + doTestScanQuery(ccfg, true, false); + doTestScanQuery(ccfg, false, true); + doTestScanQuery(ccfg, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffHeap() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_TIERED + ); + + doTestScanQuery(ccfg, true, true); + doTestScanQuery(ccfg, true, false); + doTestScanQuery(ccfg, false, true); + doTestScanQuery(ccfg, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnHeapLocalEntries() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED + ); + + doTestLocalEntries(ccfg, true, true); + doTestLocalEntries(ccfg, true, false); + doTestLocalEntries(ccfg, false, true); + doTestLocalEntries(ccfg, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffHeapLocalEntries() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_TIERED + ); + + doTestLocalEntries(ccfg, true, true); + doTestLocalEntries(ccfg, true, false); + doTestLocalEntries(ccfg, false, true); + doTestLocalEntries(ccfg, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnHeapLocalEntries() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + doTestLocalEntries(ccfg, true, true); + doTestLocalEntries(ccfg, true, false); + doTestLocalEntries(ccfg, false, true); + doTestLocalEntries(ccfg, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffHeapLocalEntries() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_TIERED + ); + + doTestLocalEntries(ccfg, true, true); + doTestLocalEntries(ccfg, true, false); + doTestLocalEntries(ccfg, false, true); + doTestLocalEntries(ccfg, false, false); + } + + /** + * @param ccfg Cache configuration. + */ + private void doTestScanQuery(CacheConfiguration<Object, Object> ccfg, boolean keepBinary, + boolean primitives) throws IgniteInterruptedCheckedException { + IgniteCache<Object, Object> cache = grid(0).createCache(ccfg); + + assertTrue(cache.size() == 0); + + try { + for (int i = 0; i < KEYS; i++) + if (primitives) + cache.put(i, i); + else + cache.put(new QueryTestKey(i), new QueryTestValue(i)); + + for (int i = 0; i < getServerNodeCount(); i++) { + IgniteCache<Object, Object> cache0 = grid(i).cache(ccfg.getName()); + + if (keepBinary) + cache0 = cache0.withKeepBinary(); + + ScanQuery<Object, Object> qry = new ScanQuery<>(); + + qry.setLocal(true); + + int size = 0; + + try (QueryCursor<Cache.Entry<Object, Object>> cur = cache0.query(qry)) { + for (Cache.Entry<Object, Object> e : cur) { + Object key = e.getKey(); + Object val = e.getValue(); + + if (!primitives) { + assertTrue("Got unexpected object: " + key.getClass() + ", keepBinary: " + keepBinary, + keepBinary == key instanceof BinaryObject); + assertTrue("Got unexpected object: " + val.getClass() + ", keepBinary: " + keepBinary, + keepBinary == val instanceof BinaryObject); + } + else { + assertTrue("Got unexpected object: " + key.getClass() + ", keepBinary: " + keepBinary, + key instanceof Integer); + assertTrue("Got unexpected object: " + val.getClass() + ", keepBinary: " + keepBinary, + val instanceof Integer); + } + + ++size; + } + } + + assertTrue(size > 0); + } + } + finally { + cache.removeAll(); + + if (ccfg.getEvictionPolicy() != null) + U.sleep(1000); // Fixes evictionPolicy issues at cache destroy. + + grid(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param ccfg Cache configuration. + */ + private void doTestLocalEntries(CacheConfiguration<Object, Object> ccfg, + boolean keepBinary, + boolean primitives) throws IgniteInterruptedCheckedException { + IgniteCache<Object, Object> cache = grid(0).createCache(ccfg); + + assertTrue(cache.size() == 0); + + try { + for (int i = 0; i < KEYS; i++) + if (primitives) + cache.put(i, i); + else + cache.put(new QueryTestKey(i), new QueryTestValue(i)); + + for (int i = 0; i < getServerNodeCount(); i++) { + IgniteCache<Object, Object> cache0 = grid(i).cache(ccfg.getName()); + + if (keepBinary) + cache0 = cache0.withKeepBinary(); + + for (CachePeekMode mode : CachePeekMode.values()) { + int size = 0; + + for (Cache.Entry<Object, Object> e : cache0.localEntries(mode)) { + Object key = e.getKey(); + Object val = e.getValue(); + + if (!primitives) { + assertTrue("Got unexpected object: " + key.getClass() + ", keepBinary: " + keepBinary, + keepBinary == key instanceof BinaryObject); + assertTrue("Got unexpected object: " + key.getClass() + ", keepBinary: " + keepBinary, + keepBinary == val instanceof BinaryObject); + } + else { + assertTrue("Got unexpected object: " + key.getClass() + ", keepBinary: " + keepBinary, + key instanceof Integer); + assertTrue("Got unexpected object: " + key.getClass() + ", keepBinary: " + keepBinary, + val instanceof Integer); + } + + ++size; + } + + if (mode == CachePeekMode.ALL || + mode == CachePeekMode.PRIMARY || + mode == CachePeekMode.BACKUP || + (mode == CachePeekMode.NEAR && i == 0 && + ccfg.getMemoryMode() == CacheMemoryMode.ONHEAP_TIERED && + ccfg.getNearConfiguration() != null) || + (mode == CachePeekMode.ONHEAP && ccfg.getMemoryMode() == CacheMemoryMode.ONHEAP_TIERED) || + (mode == CachePeekMode.OFFHEAP && ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED) || + (mode == CachePeekMode.SWAP && ccfg.isSwapEnabled())) + assertTrue("Zero result at mode: " + mode, size > 0); + } + } + } + finally { + cache.removeAll(); + + if (ccfg.getEvictionPolicy() != null) + U.sleep(1000); // Fixes evictionPolicy issues at cache destroy. + + grid(0).destroyCache(ccfg.getName()); + } + } + + /** + * @return Count nodes. + */ + protected int getServerNodeCount() { + return NODES; + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue)o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1da14369/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index dbbb3ed..ce02823 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -30,6 +30,10 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOrderingEventTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationNearEnabledTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationSwapEnabledTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest; @@ -105,6 +109,10 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class); suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class); suite.addTestSuite(IgniteCacheContinuousQueryImmutableEntryTest.class); + suite.addTestSuite(CacheKeepBinaryIterationTest.class); + suite.addTestSuite(CacheKeepBinaryIterationStoreEnabledTest.class); + suite.addTestSuite(CacheKeepBinaryIterationSwapEnabledTest.class); + suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class); return suite; }