Repository: ignite
Updated Branches:
  refs/heads/ignite-2384 df56be914 -> 9e39e500c


IGNITE-2384 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e39e500
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e39e500
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e39e500

Branch: refs/heads/ignite-2384
Commit: 9e39e500c5cbdbdb046abade8c838057589b8e66
Parents: df56be9
Author: nikolay_tikhonov <ntikho...@gridgain.com>
Authored: Mon Jan 18 16:27:29 2016 +0300
Committer: nikolay_tikhonov <ntikho...@gridgain.com>
Committed: Mon Jan 18 16:27:29 2016 +0300

----------------------------------------------------------------------
 ...ContinuousQueryFailoverAbstractSelfTest.java | 80 ++++++++++++++++++++
 .../CacheContinuousQueryLostPartitionTest.java  | 77 ++++++++++++++++---
 ...CacheContinuousQueryLostPartitionTxTest.java | 36 ---------
 .../IgniteBinaryCacheQueryTestSuite.java        |  2 -
 4 files changed, 145 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9e39e500/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 283da80..5de3d0f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -41,6 +41,9 @@ import javax.cache.CacheException;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.expiry.TouchedExpiryPolicy;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
@@ -90,11 +93,13 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  *
@@ -1278,6 +1283,81 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     /**
      * @throws Exception If failed.
      */
+    public void testBackupQueueEvict() throws Exception {
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        Ignite qryClient = startGrid(2);
+
+        CacheEventListener1 lsnr = new CacheEventListener1(false);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = qryClient.cache(null).query(qry);
+
+        final Collection<Object> backupQueue = backupQueue(ignite(0));
+
+        assertEquals(0, backupQueue.size());
+
+        long ttl = 100;
+
+        final ExpiryPolicy expiry = new TouchedExpiryPolicy(new 
Duration(MILLISECONDS, ttl));
+
+        final IgniteCache<Object, Object> cache0 = 
ignite(2).cache(null).withExpiryPolicy(expiry);
+
+        final List<Integer> keys = primaryKeys(ignite(1).cache(null), 
BACKUP_ACK_THRESHOLD);
+
+        CountDownLatch latch = new CountDownLatch(keys.size());
+
+        lsnr.latch = latch;
+
+        for (Integer key : keys) {
+            log.info("Put: " + key);
+
+            cache0.put(key, key);
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return backupQueue.isEmpty();
+            }
+        }, 2000);
+
+        assertTrue("Backup queue is not cleared: " + backupQueue, 
backupQueue.size() < BACKUP_ACK_THRESHOLD);
+
+        boolean wait = waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return cache0.localPeek(keys.get(0)) == null;
+            }
+        }, ttl + 1000);
+
+        assertTrue("Entry evicted.", wait);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return backupQueue.isEmpty();
+            }
+        }, 2000);
+
+        assertTrue("Backup queue is not cleared: " + backupQueue, 
backupQueue.size() < BACKUP_ACK_THRESHOLD);
+
+        if (backupQueue.size() != 0) {
+            for (Object o : backupQueue) {
+                CacheContinuousQueryEntry e = (CacheContinuousQueryEntry)o;
+
+                assertNotSame("Evicted entry added to backup queue.", -1L, 
e.updateCounter());
+            }
+        }
+
+        cur.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testBackupQueueCleanupServerQuery() throws Exception {
         Ignite qryClient = startGridsMultiThreaded(2);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e39e500/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
index 632a7a3..30613a4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
@@ -37,6 +37,7 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static javax.cache.configuration.FactoryBuilder.factoryOf;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
@@ -51,6 +52,9 @@ public class CacheContinuousQueryLostPartitionTest extends 
GridCommonAbstractTes
     /** Cache name. */
     public static final String CACHE_NAME = "test_cache";
 
+    /** Cache name. */
+    public static final String TX_CACHE_NAME = "tx_test_cache";
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -66,12 +70,41 @@ public class CacheContinuousQueryLostPartitionTest extends 
GridCommonAbstractTes
     /**
      * @throws Exception If failed.
      */
-    public void testEvent() throws Exception {
-        IgniteCache<Integer, String> cache1 = 
grid(0).getOrCreateCache(CACHE_NAME);
+    public void testTxEvent() throws Exception {
+        testEvent(TX_CACHE_NAME, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicEvent() throws Exception {
+        testEvent(CACHE_NAME, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClientEvent() throws Exception {
+        testEvent(TX_CACHE_NAME, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicClientEvent() throws Exception {
+        testEvent(CACHE_NAME, true);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @throws Exception If failed.
+     */
+    public void testEvent(String cacheName, boolean client) throws Exception {
+        IgniteCache<Integer, String> cache1 = 
grid(0).getOrCreateCache(cacheName);
 
         final AllEventListener<Integer, String> lsnr1 = 
registerCacheListener(cache1);
 
-        IgniteCache<Integer, String> cache2 = 
grid(1).getOrCreateCache(CACHE_NAME);
+        IgniteCache<Integer, String> cache2 = 
grid(1).getOrCreateCache(cacheName);
 
         Integer key = primaryKey(cache1);
 
@@ -79,7 +112,17 @@ public class CacheContinuousQueryLostPartitionTest extends 
GridCommonAbstractTes
 
         // Note the issue is only reproducible if the second registration is 
done right
         // here, after the first put() above.
-        final AllEventListener<Integer, String> lsnr2 = 
registerCacheListener(cache2);
+        AllEventListener<Integer, String> lsnr20;
+
+        if (client) {
+            IgniteCache<Integer, String> clnCache = 
startGrid(3).getOrCreateCache(cacheName);
+
+            lsnr20 = registerCacheListener(clnCache);
+        }
+        else
+            lsnr20 = registerCacheListener(cache2);
+
+        final AllEventListener<Integer, String> lsnr2 = lsnr20;
 
         assert GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
@@ -127,36 +170,46 @@ public class CacheContinuousQueryLostPartitionTest 
extends GridCommonAbstractTes
      * @param cache Cache.
      * @return Event listener.
      */
-    private AllEventListener<Integer, String> registerCacheListener(
-        IgniteCache<Integer, String> cache) {
+    private AllEventListener<Integer, String> 
registerCacheListener(IgniteCache<Integer, String> cache) {
         AllEventListener<Integer, String> lsnr = new AllEventListener<>();
+
         cache.registerCacheEntryListener(
             new MutableCacheEntryListenerConfiguration<>(factoryOf(lsnr), 
null, true, false));
+
         return lsnr;
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration() throws 
Exception {
-        IgniteConfiguration cfg = super.getConfiguration();
+    @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
 
         TcpDiscoverySpi spi = new TcpDiscoverySpi();
 
         spi.setIpFinder(ipFinder);
 
         cfg.setDiscoverySpi(spi);
-        cfg.setCacheConfiguration(cache());
+        cfg.setCacheConfiguration(cache(TX_CACHE_NAME), cache(CACHE_NAME));
+
+        if (name.endsWith("3"))
+            cfg.setClientMode(true);
 
         return cfg;
     }
 
     /**
+     * @param cacheName Cache name.
      * @return Cache configuration.
      */
-    protected CacheConfiguration<Integer, String> cache() {
-        CacheConfiguration<Integer, String> cfg = new 
CacheConfiguration<>(CACHE_NAME);
+    protected CacheConfiguration<Integer, String> cache(String cacheName) {
+        CacheConfiguration<Integer, String> cfg = new 
CacheConfiguration<>(cacheName);
 
         cfg.setCacheMode(PARTITIONED);
-        cfg.setAtomicityMode(ATOMIC);
+
+        if (cacheName.equals(CACHE_NAME))
+            cfg.setAtomicityMode(ATOMIC);
+        else
+            cfg.setAtomicityMode(TRANSACTIONAL);
+
         cfg.setRebalanceMode(SYNC);
         cfg.setWriteSynchronizationMode(PRIMARY_SYNC);
         cfg.setBackups(0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e39e500/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java
deleted file mode 100644
index bd72dc2..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.configuration.CacheConfiguration;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-
-/**
- *
- */
-public class CacheContinuousQueryLostPartitionTxTest extends 
CacheContinuousQueryLostPartitionTest {
-    /** {@inheritDoc} */
-    @Override protected CacheConfiguration<Integer, String> cache() {
-        CacheConfiguration<Integer, String> ccfg = super.cache();
-
-        ccfg.setAtomicityMode(TRANSACTIONAL);
-
-        return ccfg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e39e500/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index d101493..3bab1f9 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -76,7 +76,6 @@ import 
org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
 import 
org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
 import 
org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTest;
-import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTxTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
@@ -203,7 +202,6 @@ public class IgniteBinaryCacheQueryTestSuite extends 
TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
         
suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class);
-        suite.addTestSuite(CacheContinuousQueryLostPartitionTxTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);

Reply via email to