This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f7bb77c  GEODE-5056: when found the dropped events at primary sender, 
send (#1794)
f7bb77c is described below

commit f7bb77c89a3d19673e8929275fc6c407a4b382bd
Author: Xiaojian Zhou <gesterz...@users.noreply.github.com>
AuthorDate: Fri Apr 13 10:09:22 2018 -0700

    GEODE-5056: when found the dropped events at primary sender, send (#1794)
    
    QueueRemovalMessage for it
---
 .../cache/wan/AbstractGatewaySenderEventProcessor.java  |  2 +-
 .../cache/wan/parallel/ParallelGatewaySenderQueue.java  | 17 ++++++++++++++++-
 .../ParallelGatewaySenderOperationsDUnitTest.java       |  2 --
 .../serial/SerialGatewaySenderOperationsDUnitTest.java  |  2 ++
 4 files changed, 19 insertions(+), 4 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index eea7480..34d511c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -304,7 +304,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
           (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
       boolean isPrimary = 
prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
       if (isPrimary) {
-        pgsq.addRemovedEvent(prQ, bucketId, shadowKey);
+        pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
         
this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender();
         if (logger.isDebugEnabled()) {
           logger.debug("register dropped event for primary queue. BucketId is 
" + bucketId
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 89880fc..cdb33ab 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   // This method may need synchronization in case it is used by
   // ConcurrentParallelGatewaySender
-  public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) 
{
+  protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object 
key) {
     StoppableReentrantLock lock = buckToDispatchLock;
     if (lock != null) {
       lock.lock();
@@ -1133,6 +1133,21 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     }
   }
 
+  public void sendQueueRemovalMesssageForDroppedEvent(PartitionedRegion prQ, 
int bucketId,
+      Object key) {
+    final HashMap<String, Map<Integer, List>> temp = new HashMap<String, 
Map<Integer, List>>();
+    Map bucketIdToDispatchedKeys = new ConcurrentHashMap();
+    temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
+    addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
+    Set<InternalDistributedMember> recipients =
+        removalThread.getAllRecipients(sender.getCache(), temp);
+    if (!recipients.isEmpty()) {
+      ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
+      pqrm.setRecipients(recipients);
+      
sender.getCache().getInternalDistributedSystem().getDistributionManager().putOutgoing(pqrm);
+    }
+  }
+
   private void addRemovedEventToMap(Map bucketIdToDispatchedKeys, int 
bucketId, Object key) {
     List dispatchedKeys = (List) bucketIdToDispatchedKeys.get(bucketId);
     if (dispatchedKeys == null) {
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index a1808f5..f5b98b7 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -37,7 +37,6 @@ import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.WanTest;
 
 /**
@@ -377,7 +376,6 @@ public class ParallelGatewaySenderOperationsDUnitTest 
extends WANTestBase {
    * case in the way that when the sender is starting from stopped state, puts 
are simultaneously
    * happening on the region by another thread.
    */
-  @Category({FlakyTest.class, WanTest.class}) // GEODE-5056
   @Test
   public void testParallelPropagationSenderStartAfterStop_Scenario2() throws 
Exception {
     addIgnoredException("Broken pipe");
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index 329464d..c2375c2 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -48,6 +48,7 @@ import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.WanTest;
 
 /**
@@ -266,6 +267,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends 
WANTestBase {
     vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
   }
 
+  @Category({FlakyTest.class, WanTest.class}) // GEODE-5056
   @Test
   public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
     Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));

-- 
To stop receiving notification emails like this one, please contact
zho...@apache.org.

Reply via email to