[ https://issues.apache.org/jira/browse/GEODE-4659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
xiaojian zhou updated GEODE-4659: --------------------------------- Issue Type: Bug (was: New Feature) > AbstractGatewaySenderEventProcessor put loop of filter in wrong place > --------------------------------------------------------------------- > > Key: GEODE-4659 > URL: https://issues.apache.org/jira/browse/GEODE-4659 > Project: Geode > Issue Type: Bug > Components: wan > Reporter: xiaojian zhou > Priority: Major > > {noformat} > When fixing GEODE-3967, I found the loop of filter is in wrong place. > > If there's no filter defined, the processing to ignore UPDATE_VERSION_STAMP > and events with CME should have nothing to do with filters. But if there's no > filter defined, the code will not ignore the UPDATE_VERSION_STAMP and events > with CME. > > However, if fixed this problem. the GEODE-3967 have more race conditions to > be fixed. (I have fixed several of them). It looks like this bug hided other > race conditions from blowing out. > > GIving the time constrain, I will not fix the filter issue in GEODE_3967 and > log this bug for future reference. > > Here are the diff to fix or this bug: > diff --git > a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java > > b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java > index 8739a8f72..a3a89fbd0 100644 > --- > a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java > +++ > b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java > @@ -81,40 +81,8 @@ public class RemoteParallelGatewaySenderEventProcessor > extends ParallelGatewaySe > * @param disp > * @return true if remote site Gemfire Version is >= 7.0.1 > */ > - private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) > - throws GatewaySenderException { > - try { > - GatewaySenderEventRemoteDispatcher remoteDispatcher = > - (GatewaySenderEventRemoteDispatcher) disp; > - // This will create a new connection if no batch has been sent till > - // now. > - Connection conn = remoteDispatcher.getConnection(false); > - if (conn != null) { > - short remoteSiteVersion = conn.getWanSiteVersion(); > - if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) { > - return true; > - } > - } > - } catch (GatewaySenderException e) { > - Throwable cause = e.getCause(); > - if (cause instanceof IOException || e instanceof > GatewaySenderConfigurationException > - || cause instanceof ConnectionDestroyedException) { > - try { > - int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL; > - if (logger.isDebugEnabled()) { > - logger.debug("Sleeping for {} milliseconds", sleepInterval); > - } > - Thread.sleep(sleepInterval); > - } catch (InterruptedException ie) { > - // log the exception > - if (logger.isDebugEnabled()) { > - logger.debug(ie.getMessage(), ie); > - } > - } > - } > - throw e; > - } > - return false; > + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher > disp) { > + return true; > } > } > diff --git > a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java > > b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java > index 69005e02b..da5d1baee 100644 > --- > a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java > +++ > b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java > @@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger; > import org.apache.geode.cache.wan.GatewaySender; > import org.apache.geode.internal.cache.wan.AbstractGatewaySender; > import > org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher; > +import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher; > import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher; > import org.apache.geode.internal.logging.LogService; > @@ -44,4 +45,14 @@ public class RemoteSerialGatewaySenderEventProcessor > extends SerialGatewaySender > } > } > + /** > + * Returns if corresponding receiver WAN site of this GatewaySender has > GemfireVersion > 7.0.1 > + * > + * @param disp > + * @return true if remote site Gemfire Version is >= 7.0.1 > + */ > + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher > disp) { > + return true; > + } > + > } > diff --git > a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java > > b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java > index 7e67e9bfb..439394382 100644 > --- > a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java > +++ > b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java > @@ -509,27 +509,38 @@ public abstract class > AbstractGatewaySenderEventProcessor extends Thread { > } > // Filter the events > - for (GatewayEventFilter filter : sender.getGatewayEventFilters()) { > - Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); > - while (itr.hasNext()) { > - GatewayQueueEvent event = itr.next(); > - > - // This seems right place to prevent transmission of > UPDATE_VERSION events if > - // receiver's > - // version is < 7.0.1, especially to prevent another loop over > events. > - if (!sendUpdateVersionEvents > - && event.getOperation() == Operation.UPDATE_VERSION_STAMP) > { > - if (isTraceEnabled) { > - logger.trace( > - "Update Event Version event: {} removed from Gateway > Sender queue: {}", event, > - sender); > - } > + Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); > + while (itr.hasNext()) { > + GatewayQueueEvent event = itr.next(); > + > + // This seems right place to prevent transmission of > UPDATE_VERSION events if > + // receiver's > + // version is < 7.0.1, especially to prevent another loop over > events. > + if (!sendUpdateVersionEvents > + && event.getOperation() == Operation.UPDATE_VERSION_STAMP) { > + if (isDebugEnabled) { > + logger.debug("Update Event Version event: {} removed from > Gateway Sender queue: {}", > + event, sender); > + } > - itr.remove(); > - statistics.incEventsNotQueued(); > - continue; > + itr.remove(); > + statistics.incEventsNotQueued(); > + continue; > + } > + > + if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) { > + if (isDebugEnabled) { > + logger.debug( > + "Event with concurrent modification conflict: {} will be > removed from Gateway Sender queue: {}", > + event, sender); > } > + itr.remove(); > + statistics.incEventsNotQueued(); > + continue; > + } > + > + for (GatewayEventFilter filter : > sender.getGatewayEventFilters()) { > boolean transmit = filter.beforeTransmit(event); > if (!transmit) { > if (isDebugEnabled) { > @@ -538,6 +549,7 @@ public abstract class AbstractGatewaySenderEventProcessor > extends Thread { > } > itr.remove(); > statistics.incEventsFiltered(); > + break; > } > } > } > @@ -550,9 +562,9 @@ public abstract class AbstractGatewaySenderEventProcessor > extends Thread { > // AsyncEventQueue since possibleDuplicate flag is not used in WAN. > if (this.getSender().isParallel() > && (this.getDispatcher() instanceof > GatewaySenderEventCallbackDispatcher)) { > - Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); > - while (itr.hasNext()) { > - GatewaySenderEventImpl event = (GatewaySenderEventImpl) > itr.next(); > + Iterator<GatewaySenderEventImpl> eventItr = > filteredList.iterator(); > + while (eventItr.hasNext()) { > + GatewaySenderEventImpl event = (GatewaySenderEventImpl) > eventItr.next(); > PartitionedRegion qpr = null; > if (this.getQueue() instanceof > ConcurrentParallelGatewaySenderQueue) { > qpr = ((ConcurrentParallelGatewaySenderQueue) > this.getQueue()) > @@ -726,7 +738,7 @@ public abstract class AbstractGatewaySenderEventProcessor > extends Thread { > } // for > } > - private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher > dispatcher) { > + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher > dispatcher) { > // onyly in case of remote dispatcher we send versioned events > return false; > }{noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)