[ 
https://issues.apache.org/jira/browse/GEODE-4659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaojian zhou reassigned GEODE-4659:
------------------------------------

    Assignee: xiaojian zhou

> AbstractGatewaySenderEventProcessor put loop of filter in wrong place
> ---------------------------------------------------------------------
>
>                 Key: GEODE-4659
>                 URL: https://issues.apache.org/jira/browse/GEODE-4659
>             Project: Geode
>          Issue Type: Bug
>          Components: wan
>            Reporter: xiaojian zhou
>            Assignee: xiaojian zhou
>            Priority: Major
>
> {noformat}
> When fixing GEODE-3967, I found the loop of filter is in wrong place. 
>  
> If there's no filter defined, the processing  to ignore UPDATE_VERSION_STAMP 
> and events with CME should have nothing to do with filters. But if there's no 
> filter defined, the code will not ignore the UPDATE_VERSION_STAMP and events 
> with CME.
>  
> However, if fixed this problem. the GEODE-3967 have more race conditions to 
> be fixed. (I have fixed several of them). It looks like this bug hided other 
> race conditions from blowing out. 
>  
> GIving the time constrain, I will not fix the filter issue in GEODE_3967 and 
> log this bug for future reference. 
>  
> Here are the diff to fix or this bug:
> diff --git 
> a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
>  
> b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
> index 8739a8f72..a3a89fbd0 100644
> --- 
> a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
> +++ 
> b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
> @@ -81,40 +81,8 @@ public class RemoteParallelGatewaySenderEventProcessor 
> extends ParallelGatewaySe
>     * @param disp
>     * @return true if remote site Gemfire Version is >= 7.0.1
>     */
> -  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
> -      throws GatewaySenderException {
> -    try {
> -      GatewaySenderEventRemoteDispatcher remoteDispatcher =
> -          (GatewaySenderEventRemoteDispatcher) disp;
> -      // This will create a new connection if no batch has been sent till
> -      // now.
> -      Connection conn = remoteDispatcher.getConnection(false);
> -      if (conn != null) {
> -        short remoteSiteVersion = conn.getWanSiteVersion();
> -        if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
> -          return true;
> -        }
> -      }
> -    } catch (GatewaySenderException e) {
> -      Throwable cause = e.getCause();
> -      if (cause instanceof IOException || e instanceof 
> GatewaySenderConfigurationException
> -          || cause instanceof ConnectionDestroyedException) {
> -        try {
> -          int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
> -          if (logger.isDebugEnabled()) {
> -            logger.debug("Sleeping for {} milliseconds", sleepInterval);
> -          }
> -          Thread.sleep(sleepInterval);
> -        } catch (InterruptedException ie) {
> -          // log the exception
> -          if (logger.isDebugEnabled()) {
> -            logger.debug(ie.getMessage(), ie);
> -          }
> -        }
> -      }
> -      throw e;
> -    }
> -    return false;
> +  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
> disp) {
> +    return true;
>    }
> }
> diff --git 
> a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
>  
> b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
> index 69005e02b..da5d1baee 100644
> --- 
> a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
> +++ 
> b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
> @@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger;
> import org.apache.geode.cache.wan.GatewaySender;
> import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
> import 
> org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
> +import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
> import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
> import org.apache.geode.internal.logging.LogService;
> @@ -44,4 +45,14 @@ public class RemoteSerialGatewaySenderEventProcessor 
> extends SerialGatewaySender
>      }
>    }
> +  /**
> +   * Returns if corresponding receiver WAN site of this GatewaySender has 
> GemfireVersion > 7.0.1
> +   *
> +   * @param disp
> +   * @return true if remote site Gemfire Version is >= 7.0.1
> +   */
> +  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
> disp) {
> +    return true;
> +  }
> +
> }
> diff --git 
> a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
>  
> b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
> index 7e67e9bfb..439394382 100644
> --- 
> a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
> +++ 
> b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
> @@ -509,27 +509,38 @@ public abstract class 
> AbstractGatewaySenderEventProcessor extends Thread {
>            }
>            // Filter the events
> -          for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
> -            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
> -            while (itr.hasNext()) {
> -              GatewayQueueEvent event = itr.next();
> -
> -              // This seems right place to prevent transmission of 
> UPDATE_VERSION events if
> -              // receiver's
> -              // version is < 7.0.1, especially to prevent another loop over 
> events.
> -              if (!sendUpdateVersionEvents
> -                  && event.getOperation() == Operation.UPDATE_VERSION_STAMP) 
> {
> -                if (isTraceEnabled) {
> -                  logger.trace(
> -                      "Update Event Version event: {} removed from Gateway 
> Sender queue: {}", event,
> -                      sender);
> -                }
> +          Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
> +          while (itr.hasNext()) {
> +            GatewayQueueEvent event = itr.next();
> +
> +            // This seems right place to prevent transmission of 
> UPDATE_VERSION events if
> +            // receiver's
> +            // version is < 7.0.1, especially to prevent another loop over 
> events.
> +            if (!sendUpdateVersionEvents
> +                && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
> +              if (isDebugEnabled) {
> +                logger.debug("Update Event Version event: {} removed from 
> Gateway Sender queue: {}",
> +                    event, sender);
> +              }
> -                itr.remove();
> -                statistics.incEventsNotQueued();
> -                continue;
> +              itr.remove();
> +              statistics.incEventsNotQueued();
> +              continue;
> +            }
> +
> +            if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) {
> +              if (isDebugEnabled) {
> +                logger.debug(
> +                    "Event with concurrent modification conflict: {} will be 
> removed from Gateway Sender queue: {}",
> +                    event, sender);
>                }
> +              itr.remove();
> +              statistics.incEventsNotQueued();
> +              continue;
> +            }
> +
> +            for (GatewayEventFilter filter : 
> sender.getGatewayEventFilters()) {
>                boolean transmit = filter.beforeTransmit(event);
>                if (!transmit) {
>                  if (isDebugEnabled) {
> @@ -538,6 +549,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
> extends Thread {
>                  }
>                  itr.remove();
>                  statistics.incEventsFiltered();
> +                break;
>                }
>              }
>            }
> @@ -550,9 +562,9 @@ public abstract class AbstractGatewaySenderEventProcessor 
> extends Thread {
>            // AsyncEventQueue since possibleDuplicate flag is not used in WAN.
>            if (this.getSender().isParallel()
>                && (this.getDispatcher() instanceof 
> GatewaySenderEventCallbackDispatcher)) {
> -            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
> -            while (itr.hasNext()) {
> -              GatewaySenderEventImpl event = (GatewaySenderEventImpl) 
> itr.next();
> +            Iterator<GatewaySenderEventImpl> eventItr = 
> filteredList.iterator();
> +            while (eventItr.hasNext()) {
> +              GatewaySenderEventImpl event = (GatewaySenderEventImpl) 
> eventItr.next();
>                PartitionedRegion qpr = null;
>                if (this.getQueue() instanceof 
> ConcurrentParallelGatewaySenderQueue) {
>                  qpr = ((ConcurrentParallelGatewaySenderQueue) 
> this.getQueue())
> @@ -726,7 +738,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
> extends Thread {
>      } // for
>    }
> -  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
> dispatcher) {
> +  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
> dispatcher) {
>      // onyly in case of remote dispatcher we send versioned events
>      return false;
>    }{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to