[ https://issues.apache.org/jira/browse/IGNITE-11438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vyacheslav Koptilin updated IGNITE-11438: ----------------------------------------- Description: Please see the attached test: {code:java} /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.ignite.internal.processors.cache.persistence.db; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import javax.cache.expiry.AccessedExpiryPolicy; import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.MvccFeatureChecker; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED; /** * Test TTL worker with persistence enabled */ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest { /** */ public static final String CACHE_NAME = "expirableCache"; /** */ public static final String GROUP_NAME = "group1"; /** */ public static final int PART_SIZE = 32; /** */ private static final int EXPIRATION_TIMEOUT = 10; /** */ public static final int ENTRIES = 100_000; /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "false"); super.beforeTestsStarted(); } /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { super.afterTestsStopped(); System.clearProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED); } /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION); super.beforeTest(); cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); //protection if test failed to finish, e.g. by error stopAllGrids(); cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); final CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setName(CACHE_NAME); ccfg.setGroupName(GROUP_NAME); ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE)); ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT))); ccfg.setEagerTtl(true); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); cfg.setDataStorageConfiguration( new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration() .setMaxSize(192L * 1024 * 1024) .setPersistenceEnabled(true) ).setWalMode(WALMode.LOG_ONLY)); cfg.setCacheConfiguration(ccfg); return cfg; } /** * @throws Exception if failed. */ @Test public void testTtlIsApplied() throws Exception { IgniteEx srv = startGrid(0); srv.cluster().active(true); fillCache(srv.cache(CACHE_NAME)); final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME); waitAndCheckExpired(srv, cache); stopAllGrids(); } /** */ protected void fillCache(IgniteCache<Integer, byte[]> cache) { cache.putAll(new TreeMap<Integer, byte[]>() {{ for (int i = 0; i < ENTRIES; i++) put(i, new byte[1024]); }}); //Touch entries. for (int i = 0; i < ENTRIES; i++) cache.get(i); // touch entries } /** */ protected void waitAndCheckExpired( IgniteEx srv, final IgniteCache<Integer, byte[]> cache ) throws IgniteCheckedException { boolean awaited = GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { return cache.size() == 0; } }, TimeUnit.SECONDS.toMillis(EXPIRATION_TIMEOUT + 1)); assertTrue("Cache is not empty.", awaited); GridCacheSharedContext ctx = srv.context().cache().context(); GridCacheContext cctx = ctx.cacheContext(CU.cacheId(CACHE_NAME)); // Check partitions through internal API. for (int partId = 0; partId < PART_SIZE; ++partId) { GridDhtLocalPartition locPart = cctx.dht().topology().localPartition(partId); if (locPart == null) continue; IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.cache().cacheGroup(CU.cacheId(GROUP_NAME)).offheap().dataStore(locPart); GridCursor cur = dataStore.cursor(); assertFalse(cur.next()); assertEquals(0, locPart.fullSize()); } for (int i = 0; i < ENTRIES; i++) assertNull(cache.get(i)); } } {code} was: Please see the attached test: {code:java} /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.ignite.internal.processors.cache.persistence.db; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import javax.cache.expiry.AccessedExpiryPolicy; import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.MvccFeatureChecker; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED; /** * Test TTL worker with persistence enabled */ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest { /** */ public static final String CACHE_NAME = "expirableCache"; /** */ public static final String GROUP_NAME = "group1"; /** */ public static final int PART_SIZE = 32; /** */ private static final int EXPIRATION_TIMEOUT = 10; /** */ public static final int ENTRIES = 100_000; /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "false"); super.beforeTestsStarted(); } /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { super.afterTestsStopped(); System.clearProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED); } /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION); super.beforeTest(); cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); //protection if test failed to finish, e.g. by error stopAllGrids(); cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); final CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setName(CACHE_NAME); ccfg.setGroupName(GROUP_NAME); ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE)); ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT))); ccfg.setEagerTtl(true); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); cfg.setDataStorageConfiguration( new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration() .setMaxSize(192L * 1024 * 1024) .setPersistenceEnabled(true) .setCheckpointPageBufferSize(2 * 1024 * 1024) ).setWalMode(WALMode.LOG_ONLY)); cfg.setCacheConfiguration(ccfg); return cfg; } /** * @throws Exception if failed. */ @Test public void testTtlIsApplied() throws Exception { IgniteEx srv = startGrid(0); srv.cluster().active(true); fillCache(srv.cache(CACHE_NAME)); final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME); waitAndCheckExpired(srv, cache); stopAllGrids(); } /** */ protected void fillCache(IgniteCache<Integer, byte[]> cache) { cache.putAll(new TreeMap<Integer, byte[]>() {{ for (int i = 0; i < ENTRIES; i++) put(i, new byte[1024]); }}); //Touch entries. for (int i = 0; i < ENTRIES; i++) cache.get(i); // touch entries } /** */ protected void waitAndCheckExpired( IgniteEx srv, final IgniteCache<Integer, byte[]> cache ) throws IgniteCheckedException { boolean awaited = GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { return cache.size() == 0; } }, TimeUnit.SECONDS.toMillis(EXPIRATION_TIMEOUT + 1)); assertTrue("Cache is not empty.", awaited); GridCacheSharedContext ctx = srv.context().cache().context(); GridCacheContext cctx = ctx.cacheContext(CU.cacheId(CACHE_NAME)); // Check partitions through internal API. for (int partId = 0; partId < PART_SIZE; ++partId) { GridDhtLocalPartition locPart = cctx.dht().topology().localPartition(partId); if (locPart == null) continue; IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.cache().cacheGroup(CU.cacheId(GROUP_NAME)).offheap().dataStore(locPart); GridCursor cur = dataStore.cursor(); assertFalse(cur.next()); assertEquals(0, locPart.fullSize()); } for (int i = 0; i < ENTRIES; i++) assertNull(cache.get(i)); } } {code} > TTL manager may not clean entries from the underlying CacheDataStore > -------------------------------------------------------------------- > > Key: IGNITE-11438 > URL: https://issues.apache.org/jira/browse/IGNITE-11438 > Project: Ignite > Issue Type: Bug > Components: cache > Affects Versions: 2.7 > Reporter: Vyacheslav Koptilin > Priority: Major > > Please see the attached test: > {code:java} > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.ignite.internal.processors.cache.persistence.db; > import java.util.TreeMap; > import java.util.concurrent.TimeUnit; > import javax.cache.expiry.AccessedExpiryPolicy; > import javax.cache.expiry.CreatedExpiryPolicy; > import javax.cache.expiry.Duration; > import javax.cache.expiry.ExpiryPolicy; > import org.apache.ignite.IgniteCache; > import org.apache.ignite.IgniteCheckedException; > import org.apache.ignite.cache.CachePeekMode; > import org.apache.ignite.cache.CacheRebalanceMode; > import org.apache.ignite.cache.CacheWriteSynchronizationMode; > import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; > import org.apache.ignite.configuration.CacheConfiguration; > import org.apache.ignite.configuration.DataRegionConfiguration; > import org.apache.ignite.configuration.DataStorageConfiguration; > import org.apache.ignite.configuration.IgniteConfiguration; > import org.apache.ignite.configuration.WALMode; > import org.apache.ignite.internal.IgniteEx; > import org.apache.ignite.internal.IgniteInterruptedCheckedException; > import org.apache.ignite.internal.processors.cache.GridCacheContext; > import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; > import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; > import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; > import > org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; > import org.apache.ignite.internal.util.lang.GridAbsPredicate; > import org.apache.ignite.internal.util.lang.GridCursor; > import org.apache.ignite.internal.util.typedef.PA; > import org.apache.ignite.internal.util.typedef.internal.CU; > import org.apache.ignite.testframework.GridTestUtils; > import org.apache.ignite.testframework.MvccFeatureChecker; > import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; > import org.junit.Test; > import static > org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED; > /** > * Test TTL worker with persistence enabled > */ > public class IgnitePdsWithTtlTest extends GridCommonAbstractTest { > /** */ > public static final String CACHE_NAME = "expirableCache"; > /** */ > public static final String GROUP_NAME = "group1"; > /** */ > public static final int PART_SIZE = 32; > /** */ > private static final int EXPIRATION_TIMEOUT = 10; > /** */ > public static final int ENTRIES = 100_000; > /** {@inheritDoc} */ > @Override protected void beforeTestsStarted() throws Exception { > System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "false"); > super.beforeTestsStarted(); > } > /** {@inheritDoc} */ > @Override protected void afterTestsStopped() throws Exception { > super.afterTestsStopped(); > System.clearProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED); > } > /** {@inheritDoc} */ > @Override protected void beforeTest() throws Exception { > > MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION); > super.beforeTest(); > cleanPersistenceDir(); > } > /** {@inheritDoc} */ > @Override protected void afterTest() throws Exception { > super.afterTest(); > //protection if test failed to finish, e.g. by error > stopAllGrids(); > cleanPersistenceDir(); > } > /** {@inheritDoc} */ > @Override protected IgniteConfiguration getConfiguration(String > igniteInstanceName) throws Exception { > final IgniteConfiguration cfg = > super.getConfiguration(igniteInstanceName); > final CacheConfiguration ccfg = new CacheConfiguration(); > ccfg.setName(CACHE_NAME); > ccfg.setGroupName(GROUP_NAME); > ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE)); > ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new > Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT))); > ccfg.setEagerTtl(true); > > ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); > ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); > cfg.setDataStorageConfiguration( > new DataStorageConfiguration() > .setDefaultDataRegionConfiguration( > new DataRegionConfiguration() > .setMaxSize(192L * 1024 * 1024) > .setPersistenceEnabled(true) > ).setWalMode(WALMode.LOG_ONLY)); > cfg.setCacheConfiguration(ccfg); > return cfg; > } > /** > * @throws Exception if failed. > */ > @Test > public void testTtlIsApplied() throws Exception { > IgniteEx srv = startGrid(0); > srv.cluster().active(true); > fillCache(srv.cache(CACHE_NAME)); > final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME); > waitAndCheckExpired(srv, cache); > stopAllGrids(); > } > /** */ > protected void fillCache(IgniteCache<Integer, byte[]> cache) { > cache.putAll(new TreeMap<Integer, byte[]>() {{ > for (int i = 0; i < ENTRIES; i++) > put(i, new byte[1024]); > }}); > //Touch entries. > for (int i = 0; i < ENTRIES; i++) > cache.get(i); // touch entries > } > /** */ > protected void waitAndCheckExpired( > IgniteEx srv, > final IgniteCache<Integer, byte[]> cache > ) throws IgniteCheckedException { > boolean awaited = GridTestUtils.waitForCondition(new PA() { > @Override public boolean apply() { > return cache.size() == 0; > } > }, TimeUnit.SECONDS.toMillis(EXPIRATION_TIMEOUT + 1)); > assertTrue("Cache is not empty.", awaited); > GridCacheSharedContext ctx = srv.context().cache().context(); > GridCacheContext cctx = ctx.cacheContext(CU.cacheId(CACHE_NAME)); > // Check partitions through internal API. > for (int partId = 0; partId < PART_SIZE; ++partId) { > GridDhtLocalPartition locPart = > cctx.dht().topology().localPartition(partId); > if (locPart == null) > continue; > IgniteCacheOffheapManager.CacheDataStore dataStore = > > ctx.cache().cacheGroup(CU.cacheId(GROUP_NAME)).offheap().dataStore(locPart); > GridCursor cur = dataStore.cursor(); > assertFalse(cur.next()); > assertEquals(0, locPart.fullSize()); > } > for (int i = 0; i < ENTRIES; i++) > assertNull(cache.get(i)); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)