xiaojian zhou created GEODE-4659:
------------------------------------

             Summary: AbstractGatewaySenderEventProcessor put loop of filter in 
wrong place
                 Key: GEODE-4659
                 URL: https://issues.apache.org/jira/browse/GEODE-4659
             Project: Geode
          Issue Type: New Feature
          Components: wan
            Reporter: xiaojian zhou


{noformat}
When fixing GEODE-3967, I found the loop of filter is in wrong place. 

 

If there's no filter defined, the processing  to ignore UPDATE_VERSION_STAMP 
and events with CME should have nothing to do with filters. But if there's no 
filter defined, the code will not ignore the UPDATE_VERSION_STAMP and events 
with CME.

 

However, if fixed this problem. the GEODE-3967 have more race conditions to be 
fixed. (I have fixed several of them). It looks like this bug hided other race 
conditions from blowing out. 

 

GIving the time constrain, I will not fix the filter issue in GEODE_3967 and 
log this bug for future reference. 

 
Here are the diff to fix or this bug:
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
index 8739a8f72..a3a89fbd0 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
@@ -81,40 +81,8 @@ public class RemoteParallelGatewaySenderEventProcessor 
extends ParallelGatewaySe
    * @param disp
    * @return true if remote site Gemfire Version is >= 7.0.1
    */
-  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
-      throws GatewaySenderException {
-    try {
-      GatewaySenderEventRemoteDispatcher remoteDispatcher =
-          (GatewaySenderEventRemoteDispatcher) disp;
-      // This will create a new connection if no batch has been sent till
-      // now.
-      Connection conn = remoteDispatcher.getConnection(false);
-      if (conn != null) {
-        short remoteSiteVersion = conn.getWanSiteVersion();
-        if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
-          return true;
-        }
-      }
-    } catch (GatewaySenderException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof IOException || e instanceof 
GatewaySenderConfigurationException
-          || cause instanceof ConnectionDestroyedException) {
-        try {
-          int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
-          if (logger.isDebugEnabled()) {
-            logger.debug("Sleeping for {} milliseconds", sleepInterval);
-          }
-          Thread.sleep(sleepInterval);
-        } catch (InterruptedException ie) {
-          // log the exception
-          if (logger.isDebugEnabled()) {
-            logger.debug(ie.getMessage(), ie);
-          }
-        }
-      }
-      throw e;
-    }
-    return false;
+  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) 
{
+    return true;
   }
}
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
index 69005e02b..da5d1baee 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
@@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
import org.apache.geode.internal.logging.LogService;

@@ -44,4 +45,14 @@ public class RemoteSerialGatewaySenderEventProcessor extends 
SerialGatewaySender
     }
   }

+  /**
+   * Returns if corresponding receiver WAN site of this GatewaySender has 
GemfireVersion > 7.0.1
+   *
+   * @param disp
+   * @return true if remote site Gemfire Version is >= 7.0.1
+   */
+  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) 
{
+    return true;
+  }
+
}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 7e67e9bfb..439394382 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -509,27 +509,38 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
           }
           // Filter the events
-          for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
-            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
-            while (itr.hasNext()) {
-              GatewayQueueEvent event = itr.next();
-
-              // This seems right place to prevent transmission of 
UPDATE_VERSION events if
-              // receiver's
-              // version is < 7.0.1, especially to prevent another loop over 
events.
-              if (!sendUpdateVersionEvents
-                  && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
-                if (isTraceEnabled) {
-                  logger.trace(
-                      "Update Event Version event: {} removed from Gateway 
Sender queue: {}", event,
-                      sender);
-                }
+          Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
+          while (itr.hasNext()) {
+            GatewayQueueEvent event = itr.next();
+
+            // This seems right place to prevent transmission of 
UPDATE_VERSION events if
+            // receiver's
+            // version is < 7.0.1, especially to prevent another loop over 
events.
+            if (!sendUpdateVersionEvents
+                && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
+              if (isDebugEnabled) {
+                logger.debug("Update Event Version event: {} removed from 
Gateway Sender queue: {}",
+                    event, sender);
+              }

-                itr.remove();
-                statistics.incEventsNotQueued();
-                continue;
+              itr.remove();
+              statistics.incEventsNotQueued();
+              continue;
+            }
+
+            if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) {
+              if (isDebugEnabled) {
+                logger.debug(
+                    "Event with concurrent modification conflict: {} will be 
removed from Gateway Sender queue: {}",
+                    event, sender);
               }

+              itr.remove();
+              statistics.incEventsNotQueued();
+              continue;
+            }
+
+            for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
               boolean transmit = filter.beforeTransmit(event);
               if (!transmit) {
                 if (isDebugEnabled) {
@@ -538,6 +549,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
                 }
                 itr.remove();
                 statistics.incEventsFiltered();
+                break;
               }
             }
           }
@@ -550,9 +562,9 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
           // AsyncEventQueue since possibleDuplicate flag is not used in WAN.
           if (this.getSender().isParallel()
               && (this.getDispatcher() instanceof 
GatewaySenderEventCallbackDispatcher)) {
-            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
-            while (itr.hasNext()) {
-              GatewaySenderEventImpl event = (GatewaySenderEventImpl) 
itr.next();
+            Iterator<GatewaySenderEventImpl> eventItr = 
filteredList.iterator();
+            while (eventItr.hasNext()) {
+              GatewaySenderEventImpl event = (GatewaySenderEventImpl) 
eventItr.next();
               PartitionedRegion qpr = null;
               if (this.getQueue() instanceof 
ConcurrentParallelGatewaySenderQueue) {
                 qpr = ((ConcurrentParallelGatewaySenderQueue) this.getQueue())
@@ -726,7 +738,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
     } // for
   }

-  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
dispatcher) {
+  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
dispatcher) {
     // onyly in case of remote dispatcher we send versioned events
     return false;
   }{noformat}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to