Repository: ignite Updated Branches: refs/heads/master c917327b7 -> a83f3038a
http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java new file mode 100644 index 0000000..241dc2a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.concurrent.CountDownLatch; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + */ +public class CacheContinuousWithTransformerFailoverTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeLeft() throws Exception { + startGrids(3); + + client = true; + + final int CLIENT_ID = 3; + + Ignite clnNode = startGrid(CLIENT_ID); + + client = false; + + IgniteOutClosure<IgniteCache<Integer, Integer>> cache = + new IgniteOutClosure<IgniteCache<Integer, Integer>>() { + int cnt = 0; + + @Override public IgniteCache<Integer, Integer> apply() { + ++cnt; + + return grid(CLIENT_ID).cache(DEFAULT_CACHE_NAME); + } + }; + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQueryWithTransformer<Object, Object, String> qry = new ContinuousQueryWithTransformer<>(); + + qry.setLocalListener(lsnr); + qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf(new IgniteClosure<CacheEntryEvent<?, ?>, String>() { + @Override public String apply(CacheEntryEvent<?, ?> evt) { + return "" + evt.getKey() + evt.getValue(); + } + })); + + QueryCursor<?> cur = clnNode.cache(DEFAULT_CACHE_NAME).query(qry); + + boolean first = true; + + int keyCnt = 1; + + for (int i = 0; i < 10; i++) { + log.info("Start iteration: " + i); + + if (first) + first = false; + else { + for (int srv = 0; srv < CLIENT_ID - 1; srv++) + startGrid(srv); + } + + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + cache.apply().put(key, key); + + assertTrue("Failed to wait for event. Left events: " + lsnr.latch.getCount(), + lsnr.latch.await(10, SECONDS)); + + for (int srv = 0; srv < CLIENT_ID - 1; srv++) + stopGrid(srv); + } + + tryClose(cur); + } + + /** + * @throws Exception If failed. + */ + public void testTransformerException() throws Exception { + try { + startGrids(1); + + Ignite ignite = ignite(0); + + IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + + final CountDownLatch latch = new CountDownLatch(10); + + ContinuousQueryWithTransformer<Integer, Integer, Integer> qry = new ContinuousQueryWithTransformer<>(); + + qry.setLocalListener(new EventListener<Integer>() { + /** */ + @LoggerResource + private IgniteLogger log; + + @Override public void onUpdated(Iterable<? extends Integer> evts) throws CacheEntryListenerException { + for (Integer evt : evts) { + log.debug("" + evt); + } + } + }); + + qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf(new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Integer>, Integer>() { + @Override public Integer apply(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + latch.countDown(); + + throw new RuntimeException("Test error."); + } + })); + + qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheEntryEventSerializableFilter<Integer, Integer>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + return true; + } + })); + + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + for (int i = 0; i < 10; i++) + cache.put(i, i); + + assertTrue(latch.await(10, SECONDS)); + } + } finally { + stopAllGrids(); + } + } + + /** + * Ensure that every node see every update. + * + * @throws Exception If failed. + */ + public void testCrossCallback() throws Exception { + startGrids(2); + try { + IgniteCache<Integer, Integer> cache1 = grid(0).cache(DEFAULT_CACHE_NAME); + IgniteCache<Integer, Integer> cache2 = grid(1).cache(DEFAULT_CACHE_NAME); + + final int key1 = primaryKey(cache1); + final int key2 = primaryKey(cache2); + + final CountDownLatch latch1 = new CountDownLatch(2); + final CountDownLatch latch2 = new CountDownLatch(2); + + Factory<? extends IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Integer>, Integer>> factory = + FactoryBuilder.factoryOf( + new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Integer>, Integer>() { + @Override public Integer apply(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + return evt.getKey(); + } + }); + + ContinuousQueryWithTransformer<Integer, Integer, Integer> qry1 = new ContinuousQueryWithTransformer<>(); + + qry1.setRemoteTransformerFactory(factory); + + qry1.setLocalListener(new EventListener<Integer>() { + @Override public void onUpdated(Iterable<? extends Integer> evts) { + for (int evt : evts) { + log.info("Update in cache 1: " + evt); + + if (evt == key1 || evt == key2) + latch1.countDown(); + } + } + }); + + ContinuousQueryWithTransformer<Integer, Integer, Integer> qry2 = new ContinuousQueryWithTransformer<>(); + + qry2.setRemoteTransformerFactory(factory); + + qry2.setLocalListener(new EventListener<Integer>() { + @Override public void onUpdated(Iterable<? extends Integer> evts) { + for (int evt : evts) { + log.info("Update in cache 2: " + evt); + + if (evt == key1 || evt == key2) + latch2.countDown(); + } + } + }); + + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored1 = cache2.query(qry1); + QueryCursor<Cache.Entry<Integer, Integer>> ignored2 = cache2.query(qry2)) { + cache1.put(key1, key1); + cache1.put(key2, key2); + + assertTrue(latch1.await(10, SECONDS)); + assertTrue(latch2.await(10, SECONDS)); + } + } finally { + stopAllGrids(); + } + } + + /** + * @param cur Cur. + */ + private void tryClose(QueryCursor<?> cur) { + try { + cur.close(); + } + catch (Throwable e) { + if (e instanceof IgniteClientDisconnectedException) { + IgniteClientDisconnectedException ex = (IgniteClientDisconnectedException)e; + + ex.reconnectFuture().get(); + + cur.close(); + } + else + throw e; + } + } + + /** + */ + private static class CacheEventListener implements EventListener<String> { + /** */ + public volatile CountDownLatch latch = new CountDownLatch(1); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<? extends String> evts) { + for (Object evt : evts) { + log.info("Received cache event: " + evt); + + latch.countDown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerLocalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerLocalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerLocalSelfTest.java new file mode 100644 index 0000000..f01764f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerLocalSelfTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMode; + +/** + */ +public class CacheContinuousWithTransformerLocalSelfTest extends CacheContinuousWithTransformerReplicatedSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.LOCAL; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerPartitionedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerPartitionedSelfTest.java new file mode 100644 index 0000000..a76dc98 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerPartitionedSelfTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMode; + +/** + */ +public class CacheContinuousWithTransformerPartitionedSelfTest extends CacheContinuousWithTransformerReplicatedSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerRandomOperationsTest.java new file mode 100644 index 0000000..fe5ab20 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerRandomOperationsTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.query.AbstractContinuousQuery; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer; + +/** + * Test to check random continuous query operation for ContinuousQueryWithTransformer + */ +public class CacheContinuousWithTransformerRandomOperationsTest extends CacheContinuousQueryRandomOperationsTest { + /** {@inheritDoc} */ + @Override protected <K, V> AbstractContinuousQuery<K, V> createQuery() { + return new ContinuousQueryWithTransformer<>(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest.java new file mode 100644 index 0000000..6035b93 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.EventType; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + */ +public class CacheContinuousWithTransformerReplicatedSelfTest extends GridCommonAbstractTest { + /** */ + private static final int DFLT_ENTRY_CNT = 10; + + /** */ + private static final int DFLT_LATCH_TIMEOUT = 30_000; + + /** */ + private static final int DFLT_SERVER_NODE_CNT = 1; + + /** */ + private static final String SARAH_CONNOR = "Sarah Connor"; + + /** */ + private static final String JOHN_CONNOR = "John Connor"; + + /** */ + private static final boolean ADD_EVT_FILTER = true; + + /** */ + private static final boolean SKIP_EVT_FILTER = false; + + /** */ + private static final boolean KEEP_BINARY = true; + + /** */ + private static final boolean SKIP_KEEP_BINARY = false; + + /** */ + private static final long LATCH_TIMEOUT = 10_000L; + + /** */ + protected boolean client = false; + + /** */ + protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (client) + cfg.setClientMode(true); + else { + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setCacheMode(cacheMode()); + + cfg.setCacheConfiguration(ccfg); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + gridToRunQuery().cache(DEFAULT_CACHE_NAME).removeAll(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(DFLT_SERVER_NODE_CNT); + + super.beforeTestsStarted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(true); + + super.afterTestsStopped(); + } + + /** + * @return Grid to run query on. + * @throws Exception If failed. + */ + protected Ignite gridToRunQuery() throws Exception { + return grid(0); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformer() throws Exception { + runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, SKIP_KEEP_BINARY, false); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerAsync() throws Exception { + runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, SKIP_KEEP_BINARY, true); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerAndRegularListener() throws Exception { + runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, SKIP_KEEP_BINARY, false); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerAndRegularListenerAsync() throws Exception { + runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, SKIP_KEEP_BINARY, true); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerWithFilter() throws Exception { + runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, SKIP_KEEP_BINARY, false); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerWithFilterAsync() throws Exception { + runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, SKIP_KEEP_BINARY, true); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerAndRegularListenerWithFilter() throws Exception { + runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, false); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerAndRegularListenerWithFilterAsync() throws Exception { + runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, true); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerKeepBinary() throws Exception { + runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, KEEP_BINARY, false); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerKeepBinaryAsync() throws Exception { + runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, KEEP_BINARY, true); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerAndRegularListenerKeepBinary() throws Exception { + runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, KEEP_BINARY, false); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerAndRegularListenerKeepBinaryAsync() throws Exception { + runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, KEEP_BINARY, true); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerWithFilterKeepBinary() throws Exception { + runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, false); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerWithFilterKeepBinaryAsync() throws Exception { + runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, true); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerAndRegularListenerWithFilterKeepBinary() throws Exception { + runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, false); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousWithTransformerAndRegularListenerWithFilterKeepBinaryAsync() throws Exception { + runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, true); + } + + /** + * @throws Exception If failed. + */ + public void testTransformerReturnNull() throws Exception { + Ignite ignite = gridToRunQuery(); + + IgniteCache<Integer, Employee> cache = ignite.cache(DEFAULT_CACHE_NAME); + + ContinuousQueryWithTransformer<Integer, Employee, String> qry = new ContinuousQueryWithTransformer<>(); + + final AtomicInteger cnt = new AtomicInteger(0); + + qry.setLocalListener(new EventListener() { + @Override public void onUpdated(Iterable events) throws CacheEntryListenerException { + for (Object e : events) { + assertNull(e); + + cnt.incrementAndGet(); + } + } + }); + + qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf( + new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Employee>, String>() { + @Override public String apply(CacheEntryEvent<? extends Integer, ? extends Employee> evt) { + return null; + } + })); + + qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheEntryEventSerializableFilter<Integer, Employee>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Employee> evt) { + return true; + } + })); + + try (QueryCursor<Cache.Entry<Integer, Employee>> ignored = cache.query(qry)) { + for (int i = 0; i < 10; i++) + cache.put(i, new Employee(JOHN_CONNOR, i)); + + boolean evtsReceived = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return cnt.get() == 10; + } + }, 20_000); + + assertTrue(evtsReceived); + } + } + + /** + * @throws Exception If failed. + */ + public void testExpired() throws Exception { + Ignite ignite = gridToRunQuery(); + + IgniteCache<Integer, Employee> cache = ignite.cache(DEFAULT_CACHE_NAME); + + cache = cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100))); + + final Set<Integer> keys = new GridConcurrentHashSet<>(); + final CountDownLatch latch = new CountDownLatch(2); + + ContinuousQueryWithTransformer<Integer, Employee, Integer> qry = new ContinuousQueryWithTransformer<>(); + + qry.setIncludeExpired(true); + + qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheEntryEventSerializableFilter<Integer, Employee>() { + @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException { + return event.getEventType() == EventType.EXPIRED; + } + })); + + qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf( + new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Employee>, Integer>() { + @Override public Integer apply(CacheEntryEvent<? extends Integer, ? extends Employee> evt) { + assertNull(evt.getValue()); + + assertNotNull(evt.getOldValue()); + + return evt.getKey(); + } + })); + + qry.setLocalListener(new EventListener<Integer>() { + @Override public void onUpdated(Iterable<? extends Integer> evts) { + for (Integer key : evts) { + keys.add(key); + + latch.countDown(); + } + } + }); + + try (QueryCursor<Cache.Entry<Integer, Employee>> ignored = cache.query(qry)) { + cache.put(1, new Employee(SARAH_CONNOR, 42)); + cache.put(2, new Employee(JOHN_CONNOR, 42)); + + // Wait for expiration. + latch.await(LATCH_TIMEOUT, MILLISECONDS); + + assertEquals(2, keys.size()); + + assertTrue(keys.contains(1)); + assertTrue(keys.contains(2)); + } + } + + /** + * @param addEvtFilter Add event filter to ContinuousQueryWithTransformer flag. + * @param expTransCnt Expected transformed event count. + * @param keepBinary Keep binary flag. + * @param async Flag to use transformed event listener with {@link IgniteAsyncCallback}. + * @throws Exception If failed. + */ + private void runContinuousQueryWithTransformer(boolean addEvtFilter, int expTransCnt, boolean keepBinary, + boolean async) + throws Exception { + Ignite ignite = gridToRunQuery(); + + IgniteCache<Integer, Employee> cache = ignite.cache(DEFAULT_CACHE_NAME); + + if (keepBinary) + cache = cache.withKeepBinary(); + + populateData(cache, JOHN_CONNOR); + + CountDownLatch transUpdCntLatch = new CountDownLatch(expTransCnt); + + AtomicInteger transCnt = new AtomicInteger(0); + + EventListener<String> transLsnr = async ? + new LocalEventListenerAsync(transCnt, transUpdCntLatch) : + new LocalEventListener(transCnt, transUpdCntLatch); + + Factory<? extends CacheEntryEventFilter> rmtFilterFactory = null; + + if (addEvtFilter) + rmtFilterFactory = FactoryBuilder.factoryOf(new RemoteCacheEntryEventFilter()); + + Factory<? extends IgniteClosure> factory = FactoryBuilder.factoryOf(new RemoteTransformer(keepBinary)); + + ContinuousQueryWithTransformer<Integer, Employee, String> qry = new ContinuousQueryWithTransformer<>(); + + qry.setInitialQuery(new ScanQuery<Integer, Employee>()); + qry.setRemoteFilterFactory((Factory<? extends CacheEntryEventFilter<Integer, Employee>>)rmtFilterFactory); + qry.setRemoteTransformerFactory( + (Factory<? extends IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Employee>, String>>)factory); + qry.setLocalListener(transLsnr); + + try (QueryCursor<Cache.Entry<Integer, Employee>> cur = cache.query(qry)) { + for (Cache.Entry<Integer, Employee> e : cur) { + assertNotNull(e); + + if (keepBinary) { + assertTrue(((BinaryObject)e.getValue()) + .field("name").toString().startsWith(JOHN_CONNOR)); + } + else { + assertTrue(e.getValue().name.startsWith(JOHN_CONNOR)); + } + } + + populateData(cache, SARAH_CONNOR); + + assertTrue("Receive all expected events", + transUpdCntLatch.await(DFLT_LATCH_TIMEOUT, MILLISECONDS)); + assertEquals("Count of updated records equal to expected", expTransCnt, transCnt.get()); + + } + } + + /** + * Put some data to cache. + * + * @param cache Cache to put data to. + * @param name Base name of Employee. + */ + private void populateData(IgniteCache<Integer, Employee> cache, String name) { + for (int i = 0; i < DFLT_ENTRY_CNT; i++) + cache.put(i, new Employee(name + i, 42 * i)); + } + + /** + */ + @IgniteAsyncCallback + private static class LocalEventListenerAsync extends LocalEventListener { + LocalEventListenerAsync(AtomicInteger transCnt, CountDownLatch transUpdCnt) { + super(transCnt, transUpdCnt); + } + } + + /** + */ + private static class RemoteTransformer implements IgniteClosure<CacheEntryEvent<?, ?>, String> { + /** */ + private boolean keepBinary; + + /** */ + RemoteTransformer(boolean keepBinary) { + this.keepBinary = keepBinary; + } + + /** {@inheritDoc} */ + @Override public String apply(CacheEntryEvent<?, ?> evt) { + if (keepBinary) + return ((BinaryObject)evt.getValue()).field("name"); + + return ((Employee)evt.getValue()).name; + } + } + + /** + */ + private static class RemoteCacheEntryEventFilter implements CacheEntryEventSerializableFilter<Integer, Object> { + /** {@inheritDoc} */ + @Override public boolean evaluate( + CacheEntryEvent<? extends Integer, ?> event) throws CacheEntryListenerException { + return event.getKey() % 2 == 0; + } + } + + /** + */ + private static class LocalEventListener implements EventListener<String> { + /** */ + private final AtomicInteger cnt; + + /** */ + private final CountDownLatch cntLatch; + + /** */ + LocalEventListener(AtomicInteger transCnt, CountDownLatch transUpdCnt) { + this.cnt = transCnt; + this.cntLatch = transUpdCnt; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<? extends String> events) throws CacheEntryListenerException { + for (String evt : events) { + if (evt.startsWith(SARAH_CONNOR)) + cntLatch.countDown(); + + cnt.incrementAndGet(); + } + } + } + + /** + */ + public class Employee { + /** */ + public String name; + + /** */ + public Integer salary; + + /** */ + Employee(String name, Integer salary) { + this.name = name; + this.salary = salary; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index 4ea8bca..486626a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -32,6 +32,12 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOrderingEventTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerFailoverTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerClientSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerLocalSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerPartitionedSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerRandomOperationsTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationNearEnabledTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest; @@ -124,6 +130,13 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(CacheContinuousQueryConcurrentPartitionUpdateTest.class); suite.addTestSuite(CacheContinuousQueryEventBufferTest.class); + suite.addTestSuite(CacheContinuousWithTransformerReplicatedSelfTest.class); + suite.addTestSuite(CacheContinuousWithTransformerLocalSelfTest.class); + suite.addTestSuite(CacheContinuousWithTransformerPartitionedSelfTest.class); + suite.addTestSuite(CacheContinuousWithTransformerClientSelfTest.class); + suite.addTestSuite(CacheContinuousWithTransformerFailoverTest.class); + suite.addTestSuite(CacheContinuousWithTransformerRandomOperationsTest.class); + return suite; } }