This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 38139fb  GEODE-8329: Fix for durable CQ reqistration recovery (#5360)
38139fb is described below

commit 38139fbb00cbea872348d3554f9589bd7c5bfdde
Author: Jakov Varenina <62134331+jvaren...@users.noreply.github.com>
AuthorDate: Mon Dec 7 12:35:49 2020 +0100

    GEODE-8329: Fix for durable CQ reqistration recovery (#5360)
    
    * GEODE-8329: Fix for durable CQ reqistration recovery
    
    This change solves the issue when the client without configured HA is
    wrongly re-registering durable CQs as non durable during the server
    failover.
    
    * Fix for stressTest
    
    * empty commit to re-launch CI
---
 .../cache/client/internal/QueueManagerImpl.java    |   3 +-
 .../tier/sockets/DurableClientCQDUnitTest.java     | 139 +++++++++++++++++++++
 .../cache/tier/sockets/DurableClientTestBase.java  |  52 ++++++--
 3 files changed, 183 insertions(+), 11 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
index 212d2de..145817c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
@@ -1112,7 +1112,8 @@ public class QueueManagerImpl implements QueueManager {
             .set(((DefaultQueryService) 
this.pool.getQueryService()).getUserAttributes(name));
       }
       try {
-        if (((CqStateImpl) cqi.getState()).getState() != CqStateImpl.INIT) {
+        if (((CqStateImpl) cqi.getState()).getState() != CqStateImpl.INIT
+            && cqi.isDurable() == isDurable) {
           cqi.createOn(recoveredConnection, isDurable);
         }
       } finally {
diff --git 
a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
 
b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
index 4b24be9..cc171eb 100644
--- 
a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
+++ 
b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
@@ -18,6 +18,7 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.apache.geode.cache.Region.SEPARATOR;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -51,10 +52,13 @@ import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
 import org.apache.geode.internal.cache.ClientServerObserverAdapter;
 import org.apache.geode.internal.cache.ClientServerObserverHolder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 
 @Category({ClientSubscriptionTest.class})
@@ -145,6 +149,107 @@ public class DurableClientCQDUnitTest extends 
DurableClientTestBase {
   }
 
   /**
+   * Test that durable CQ is correctly re-registered to new server after the 
failover and
+   * that the durable client functionality works as expected.
+   * Steps:
+   * 1. Start two servers
+   * 2. Start durable client without HA and register durable CQs
+   * 3. Shutdown the server that is hosting CQs subscription queue (primary 
server)
+   * 4. Wait for the durable client to perform the failover to the another 
server
+   * 5. Shutdown the durable client with keepAlive flag set to true
+   * 6. Provision remaining server with the data that should fulfil CQ 
condition and fill the queue
+   * 7. Start the durable client again and check that it receives correct 
events from queue
+   */
+  @Test
+  public void testDurableCQServerFailoverWithoutHAConfigured()
+      throws Exception {
+    String greaterThan5Query = "select * from " + SEPARATOR + regionName + " p 
where p.ID > 5";
+    String allQuery = "select * from " + SEPARATOR + regionName + " p where 
p.ID > -1";
+    String lessThan5Query = "select * from " + SEPARATOR + regionName + " p 
where p.ID < 5";
+
+    // Start a server 1
+    server1Port = this.server1VM
+        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, 
Boolean.TRUE));
+
+    // Start server 2
+    server2Port = this.server2VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, Boolean.TRUE});
+
+    // Start a durable client that is kept alive on the server when it stops 
normally
+    durableClientId = getName() + "_client";
+    CacheServerTestUtil.createCacheClient(
+        getClientPool(NetworkUtils.getServerHostName(), server1Port, 
server2Port, true, 0),
+        regionName, getClientDistributedSystemProperties(durableClientId), 
Boolean.TRUE);
+
+    // register non durable cq
+    createCq("GreaterThan5", greaterThan5Query, false).execute();
+
+    // register durable cqs
+    createCq("All", allQuery, true).execute();
+    createCq("LessThan5", lessThan5Query, true).execute();
+
+    // send client ready
+    CacheServerTestUtil.getClientCache().readyForEvents();
+
+    int oldPrimaryPort = getPrimaryServerPort();
+    // Close the server that is hosting subscription queue
+    VM primary = getPrimaryServerVM();
+    // Verify durable client on server
+    
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, 
durableClientId,
+        primary);
+
+    primary.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
+
+    // Wait until failover to the another server is successfully performed
+    waitForFailoverToPerform(oldPrimaryPort);
+    primary = getPrimaryServerVM();
+    waitForDurableClientPresence(durableClientId, primary, 1);
+    int primaryPort = getPrimaryServerPort();
+
+    // Stop the durable client
+    CacheServerTestUtil.closeCache(true);
+
+    // Start normal publisher client
+    startClient(publisherClientVM, primaryPort, regionName);
+
+    // Publish some entries
+    publishEntries(regionName, 10);
+
+    // Restart the durable client
+    CacheServerTestUtil.createCacheClient(
+        getClientPool(NetworkUtils.getServerHostName(), primaryPort, true),
+        regionName, getClientDistributedSystemProperties(durableClientId), 
Boolean.TRUE);
+    assertThat(CacheServerTestUtil.getClientCache()).isNotNull();
+
+    // Re-register non durable cq
+    createCq("GreaterThan5", greaterThan5Query, false).execute();
+
+    // Re-register durable cqs
+    createCq("All", allQuery, true).execute();
+    createCq("LessThan5", lessThan5Query, true).execute();
+
+    // send client ready
+    CacheServerTestUtil.getClientCache().readyForEvents();
+
+    // verify cq events for all 3 cqs
+    checkCqListenerEvents("GreaterThan5", 0 /* numEventsExpected */,
+        /* numEventsToWaitFor */ 15/* secondsToWait */);
+    checkCqListenerEvents("LessThan5", 5 /* numEventsExpected */,
+        /* numEventsToWaitFor */ 15/* secondsToWait */);
+    checkCqListenerEvents("All", 10 /* numEventsExpected */,
+        /* numEventsToWaitFor */ 15/* secondsToWait */);
+
+    primary = getPrimaryServerVM();
+    // Stop the durable client
+    CacheServerTestUtil.closeCache(false);
+    // Stop the publisher client
+    this.publisherClientVM.invoke((SerializableRunnableIF) 
CacheServerTestUtil::closeCache);
+    // Stop the remaining server
+    primary.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
+  }
+
+
+  /**
    * Test functionality to close the cq and drain all events from the ha queue 
from the server This
    * draining should not affect events that still have register interest
    */
@@ -782,6 +887,8 @@ public class DurableClientCQDUnitTest extends 
DurableClientTestBase {
 
   @Test
   public void testGetAllDurableCqsFromServer() {
+
+
     // Start server 1
     server1Port = this.server1VM.invoke(CacheServerTestUtil.class,
         "createCacheServer", new Object[] {regionName, Boolean.TRUE});
@@ -971,6 +1078,38 @@ public class DurableClientCQDUnitTest extends 
DurableClientTestBase {
     vm.invoke(cacheSerializableRunnable);
   }
 
+  public VM getPrimaryServerVM() {
+    if (this.server1Port == getPrimaryServerPort()) {
+      return server1VM;
+    } else {
+      return server2VM;
+    }
+  }
+
+  public int getPrimaryServerPort() {
+    PoolImpl pool = CacheServerTestUtil.getPool();
+    ServerLocation primaryServerLocation = pool.getPrimary();
+    return primaryServerLocation.getPort();
+  }
+
+  public void waitForFailoverToPerform(int oldPrimaryPort) {
+    final PoolImpl pool = CacheServerTestUtil.getPool();
+    WaitCriterion ev = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return pool.getPrimary() != null && pool.getPrimary().getPort() != 
oldPrimaryPort;
+      }
+
+      @Override
+      public String description() {
+        return null;
+      }
+    };
+
+    GeodeAwaitility.await().untilAsserted(ev);
+    assertNotNull(pool.getPrimary());
+  }
+
   void registerDurableCq(final String cqName) {
     // Durable client registers durable cq on server
     this.durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") {
diff --git 
a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
 
b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
index 796e58f..d01d5cb 100644
--- 
a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
+++ 
b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
@@ -60,10 +60,12 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PoolFactoryImpl;
 import org.apache.geode.internal.cache.ha.HARegionQueue;
 import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 
 public class DurableClientTestBase extends JUnit4DistributedTestCase {
@@ -80,9 +82,9 @@ public class DurableClientTestBase extends 
JUnit4DistributedTestCase {
   VM publisherClientVM;
   protected String regionName;
   int server1Port;
+  int server2Port;
   String durableClientId;
 
-
   @Override
   public final void postSetUp() throws Exception {
     this.server1VM = VM.getVM(0);
@@ -172,6 +174,32 @@ public class DurableClientTestBase extends 
JUnit4DistributedTestCase {
     verifyDurableClientPresence(durableClientTimeout, durableClientId, 
serverVM, 0);
   }
 
+  void waitForDurableClientPresence(String durableClientId, VM serverVM, final 
int count) {
+    serverVM.invoke(() -> {
+      if (count > 0) {
+
+        WaitCriterion ev = new WaitCriterion() {
+          @Override
+          public boolean done() {
+            checkNumberOfClientProxies(count);
+            CacheClientProxy proxy = getClientProxy();
+
+            if (proxy != null && durableClientId.equals(proxy.getDurableId())) 
{
+              return true;
+            }
+            return false;
+          }
+
+          @Override
+          public String description() {
+            return null;
+          }
+        };
+        GeodeAwaitility.await().untilAsserted(ev);
+      }
+    });
+  }
+
   void verifyDurableClientPresence(int durableClientTimeout, String 
durableClientId,
       VM serverVM, final int count) {
     serverVM.invoke(() -> {
@@ -372,7 +400,7 @@ public class DurableClientTestBase extends 
JUnit4DistributedTestCase {
     }
   }
 
-  private CqQuery createCq(String cqName, String cqQuery, boolean durable)
+  CqQuery createCq(String cqName, String cqQuery, boolean durable)
       throws CqException, CqExistsException {
     QueryService qs = CacheServerTestUtil.getCache().getQueryService();
     CqAttributesFactory cqf = new CqAttributesFactory();
@@ -461,7 +489,6 @@ public class DurableClientTestBase extends 
JUnit4DistributedTestCase {
     return bridgeServer;
   }
 
-
   Pool getClientPool(String host, int server1Port, int server2Port,
       boolean establishCallbackConnection, int redundancyLevel) {
     PoolFactory pf = PoolManager.createFactory();
@@ -664,16 +691,21 @@ public class DurableClientTestBase extends 
JUnit4DistributedTestCase {
   void checkCqListenerEvents(VM vm, final String cqName, final int numEvents,
       final int secondsToWait) {
     vm.invoke(() -> {
-      QueryService qs = CacheServerTestUtil.getCache().getQueryService();
-      CqQuery cq = qs.getCq(cqName);
-      // Get the listener and wait for the appropriate number of events
-      CacheServerTestUtil.ControlCqListener listener =
-          (CacheServerTestUtil.ControlCqListener) 
cq.getCqAttributes().getCqListener();
-      listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEvents);
-      assertThat(numEvents).isEqualTo(listener.events.size());
+      checkCqListenerEvents(cqName, numEvents, secondsToWait);
     });
   }
 
+  void checkCqListenerEvents(final String cqName, final int numEvents,
+      final int secondsToWait) {
+    QueryService qs = CacheServerTestUtil.getCache().getQueryService();
+    CqQuery cq = qs.getCq(cqName);
+    // Get the listener and wait for the appropriate number of events
+    CacheServerTestUtil.ControlCqListener listener =
+        (CacheServerTestUtil.ControlCqListener) 
cq.getCqAttributes().getCqListener();
+    listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEvents);
+    assertThat(numEvents).isEqualTo(listener.events.size());
+  }
+
   void checkListenerEvents(int numberOfEntries, final int sleepMinutes, final 
int eventType,
       final VM vm) {
     vm.invoke(() -> {

Reply via email to