This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7e0888b2542bc027ec6889d007725b0dcbe029f7 Author: Jacob Barrett <jbarr...@pivotal.io> AuthorDate: Thu May 20 15:43:21 2021 -0700 GEODE-6588: Cleanup AbstractUpdateOperation --- .../internal/cache/AbstractUpdateOperation.java | 57 +-- .../internal/cache/CacheDistributionAdvisor.java | 34 +- .../internal/cache/DistributedCacheOperation.java | 387 ++++++++++----------- .../cache/DistributedCacheOperationTest.java | 11 +- 4 files changed, 224 insertions(+), 265 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java index 69bfa39..32df318 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java @@ -33,7 +33,6 @@ import org.apache.geode.cache.Scope; import org.apache.geode.cache.TimeoutException; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DirectReplyProcessor; -import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; import org.apache.geode.internal.cache.versions.VersionTag; @@ -57,13 +56,13 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation private final long lastModifiedTime; - public AbstractUpdateOperation(CacheEvent event, long lastModifiedTime) { + public AbstractUpdateOperation(CacheEvent<?, ?> event, long lastModifiedTime) { super(event); this.lastModifiedTime = lastModifiedTime; } @Override - protected Set getRecipients() { + protected Set<InternalDistributedMember> getRecipients() { CacheDistributionAdvisor advisor = getRegion().getCacheDistributionAdvisor(); return advisor.adviseUpdate(getEvent()); } @@ -72,11 +71,7 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation protected void initMessage(CacheOperationMessage msg, DirectReplyProcessor pr) { super.initMessage(msg, pr); AbstractUpdateMessage m = (AbstractUpdateMessage) msg; - DistributedRegion region = getRegion(); - DistributionManager mgr = region.getDistributionManager(); - // [bruce] We might have to stop using cacheTimeMillis because it causes a skew between - // lastModified and the version tag's timestamp - m.lastModified = this.lastModifiedTime; + m.lastModified = lastModifiedTime; } private static final boolean ALWAYS_REPLICATE_UPDATES = @@ -85,19 +80,12 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation /** @return whether we should do a local create for a remote one */ private static boolean shouldDoRemoteCreate(LocalRegion rgn, EntryEventImpl ev) { DataPolicy dp = rgn.getAttributes().getDataPolicy(); - if (!rgn.isAllEvents() || (dp.withReplication() && rgn.isInitialized() - && ev.getOperation().isUpdate() && !rgn.getConcurrencyChecksEnabled() - // misordered CREATE and - // UPDATE messages can - // cause inconsistencies - && !ALWAYS_REPLICATE_UPDATES)) { - // we are not accepting all events - // or we are a replicate and initialized and it was an update - // (we exclude that latter to avoid resurrecting a key deleted in a replicate - return false; - } else { - return true; - } + // we are not accepting all events or we are a replicate and initialized and it was an update + // (we exclude that latter to avoid resurrecting a key deleted in a replicate + // misordered CREATE and UPDATE messages can cause inconsistencies + return rgn.isAllEvents() && (!dp.withReplication() || !rgn.isInitialized() + || !ev.getOperation().isUpdate() || rgn.getConcurrencyChecksEnabled() + || ALWAYS_REPLICATE_UPDATES); } private static boolean checkIfToUpdateAfterCreateFailed(LocalRegion rgn, EntryEventImpl ev) { @@ -200,7 +188,6 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation if (logger.isTraceEnabled()) { logger.trace("Processing put key {} in region {}", ev.getKey(), rgn.getFullPath()); } - updated = true; } else { // key not here, blocked by DESTROYED token or ConcurrentCacheModificationException // thrown during second update attempt @@ -211,7 +198,6 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation rgn.basicUpdate(ev, false, false, lastMod, true, invokeCallbacks, false); if (thirdBasicUpdateSuccess) { rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote()); - updated = true; } } else { if (rgn.getVersionVector() != null && ev.getVersionTag() != null) { @@ -264,25 +250,20 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation protected long lastModified; @Override - protected boolean operateOnRegion(CacheEvent event, ClusterDistributionManager dm) + protected boolean operateOnRegion(CacheEvent<?, ?> event, ClusterDistributionManager dm) throws EntryNotFoundException { EntryEventImpl ev = (EntryEventImpl) event; DistributedRegion rgn = (DistributedRegion) ev.getRegion(); - DistributionManager mgr = dm; - boolean sendReply = true; // by default tell caller to send ack - // if (!rgn.hasSeenEvent((InternalCacheEvent)event)) { if (!rgn.isCacheContentProxy()) { basicOperateOnRegion(ev, rgn); - } - // } - else { + } else { if (logger.isDebugEnabled()) { logger.debug("UpdateMessage: this cache has already seen this event {}", event); } } - return sendReply; + return true; // tell caller to send ack } // @todo darrel: make this method static? @@ -297,15 +278,15 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation logger.debug("Processing {}", this); } try { - long time = this.lastModified; + long time = lastModified; if (ev.getVersionTag() != null) { checkVersionTag(rgn, ev.getVersionTag()); time = ev.getVersionTag().getVersionTimeStamp(); } - this.appliedOperation = doPutOrCreate(rgn, ev, time); + appliedOperation = doPutOrCreate(rgn, ev, time); } catch (ConcurrentCacheModificationException e) { dispatchElidedEvent(rgn, ev); - this.appliedOperation = false; + appliedOperation = false; } } @@ -313,25 +294,25 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation protected void appendFields(StringBuilder buff) { super.appendFields(buff); buff.append("; lastModified="); - buff.append(this.lastModified); + buff.append(lastModified); } @Override public void fromData(DataInput in, DeserializationContext context) throws IOException, ClassNotFoundException { super.fromData(in, context); - this.lastModified = in.readLong(); + lastModified = in.readLong(); } @Override public void toData(DataOutput out, SerializationContext context) throws IOException { super.toData(out, context); - out.writeLong(this.lastModified); + out.writeLong(lastModified); } protected void checkVersionTag(DistributedRegion rgn, VersionTag tag) { - RegionAttributes attr = rgn.getAttributes(); + RegionAttributes<?, ?> attr = rgn.getAttributes(); if (attr.getConcurrencyChecksEnabled() && attr.getDataPolicy().withPersistence() && attr.getScope() != Scope.GLOBAL && (tag.getMemberID() == null || test_InvalidVersion)) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java index 5879cb6..3d49278 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java @@ -188,7 +188,8 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * Provide recipient information for an update or create operation. * */ - Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException { + Set<InternalDistributedMember> adviseUpdate(final EntryEventImpl event) + throws IllegalStateException { if (event.hasNewValue() || event.getOperation().isPutAll()) { // only need to distribute it to members that want all events or cache data return adviseAllEventsOrCached(); @@ -246,7 +247,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { }); } else { StringBuilder badIds = new StringBuilder(); - Iterator biI = badList.iterator(); + Iterator<InternalDistributedMember> biI = badList.iterator(); while (biI.hasNext()) { badIds.append(biI.next().toString()); if (biI.hasNext()) { @@ -254,8 +255,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { } } throw new IllegalStateException( - String.format("Illegal Region Configuration for members: %s", - badIds.toString())); + String.format("Illegal Region Configuration for members: %s", badIds)); } } @@ -265,7 +265,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * @return Set of Serializable members that have a CacheLoader installed; no reference to Set kept * by advisor so caller is free to modify it */ - public Set adviseNetLoad() { + public Set<InternalDistributedMember> adviseNetLoad() { return adviseFilter(profile -> { assert profile instanceof CacheProfile; CacheProfile prof = (CacheProfile) profile; @@ -279,7 +279,8 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { }); } - public FilterRoutingInfo adviseFilterRouting(CacheEvent event, Set cacheOpRecipients) { + public FilterRoutingInfo adviseFilterRouting(CacheEvent<?, ?> event, + Set<InternalDistributedMember> cacheOpRecipients) { FilterProfile fp = ((LocalRegion) event.getRegion()).getFilterProfile(); if (fp != null) { return fp.getFilterRoutingInfoPart1(event, profiles, cacheOpRecipients); @@ -310,7 +311,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { /** * Same as adviseGeneric */ - public Set adviseDestroyRegion() { + public Set<InternalDistributedMember> adviseDestroyRegion() { return adviseGeneric(); } @@ -320,7 +321,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * @return Set of Serializable member ids that have a CacheWriter installed; no reference to Set * kept by advisor so caller is free to modify it */ - public Set adviseNetWrite() { + public Set<InternalDistributedMember> adviseNetWrite() { return adviseFilter(profile -> { assert profile instanceof CacheProfile; CacheProfile prof = (CacheProfile) profile; @@ -347,7 +348,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * @return Set of Serializable member ids that have the region and are have storage (no need to * search an empty cache) */ - Set adviseNetSearch() { + Set<InternalDistributedMember> adviseNetSearch() { return adviseFilter(profile -> { assert profile instanceof CacheProfile; CacheProfile cp = (CacheProfile) profile; @@ -452,7 +453,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * * @since GemFire 5.5 */ - Set adviseRequiresOldValueInCacheOp() { + Set<InternalDistributedMember> adviseRequiresOldValueInCacheOp() { return adviseFilter(profile -> { assert profile instanceof CacheProfile; CacheProfile cp = (CacheProfile) profile; @@ -675,11 +676,11 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { public boolean getInRecovery() { return inRecovery; - }; + } public void setInRecovery(boolean recovery) { inRecovery = recovery; - }; + } public DataPolicy getDataPolicy() { return dataPolicy; @@ -743,9 +744,6 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * Return true if cached or allEvents and a listener */ boolean cachedOrAllEventsWithListener() { - // to fix bug 36804 to ignore hasCacheListener - // return this.dataPolicy.withStorage() || - // (allEvents() && this.hasCacheListener); return cachedOrAllEvents(); } @@ -1038,7 +1036,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * @return the set of preloaded's memberIds * @since GemFire prPersistSprint1 */ - public Set advisePreloadeds() { + public Set<InternalDistributedMember> advisePreloadeds() { return adviseFilter(profile -> { assert profile instanceof CacheProfile; CacheProfile cp = (CacheProfile) profile; @@ -1052,7 +1050,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * @return the set of replicate's memberIds * @since GemFire 5.8 */ - Set adviseEmptys() { + Set<InternalDistributedMember> adviseEmptys() { return adviseFilter(profile -> { assert profile instanceof CacheProfile; CacheProfile cp = (CacheProfile) profile; @@ -1106,7 +1104,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { return result; } - Set adviseCacheServers() { + Set<InternalDistributedMember> adviseCacheServers() { getAdvisee().getCancelCriterion().checkCancelInProgress(null); return adviseFilter(profile -> { assert profile instanceof CacheProfile; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index 4a53ef6..e32a2af 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.Vector; import org.apache.logging.log4j.Logger; @@ -138,11 +137,6 @@ public abstract class DistributedCacheOperation { } } - // static values for oldValueIsObject - public static final byte VALUE_IS_BYTES = 0; - public static final byte VALUE_IS_SERIALIZED_OBJECT = 1; - public static final byte VALUE_IS_OBJECT = 2; - /** * Given a VALUE_IS_* constant convert and return the corresponding DESERIALIZATION_POLICY_*. */ @@ -169,9 +163,9 @@ public abstract class DistributedCacheOperation { protected CacheOperationReplyProcessor processor = null; - protected Set departedMembers; + protected Set<InternalDistributedMember> departedMembers; - protected Set originalRecipients; + protected Set<InternalDistributedMember> originalRecipients; @MutableForTesting static Runnable internalBeforePutOutgoing; @@ -188,7 +182,7 @@ public abstract class DistributedCacheOperation { } /** Creates a new instance of DistributedCacheOperation */ - public DistributedCacheOperation(CacheEvent event) { + public DistributedCacheOperation(CacheEvent<?, ?> event) { this.event = (InternalCacheEvent) event; } @@ -200,24 +194,19 @@ public abstract class DistributedCacheOperation { * @since GemFire 5.0 */ boolean isOperationReliable() { - Operation op = this.event.getOperation(); + Operation op = event.getOperation(); if (!op.isRegionDestroy()) { return true; } - if (op.isDistributed()) { - return true; - } // must be a region destroy that is "local" which means // Region.localDestroyRegion or Region.close or Cache.clsoe // none of these should do reliability checks - return false; + return op.isDistributed(); } public boolean supportsDirectAck() { // force use of shared connection if we're already in a secondary - // thread-owned reader thread. See bug #49565. Also see Connection#processNIOBuffer - // int dominoCount = org.apache.geode.internal.tcp.Connection.getDominoCount(); - // return dominoCount < 2; + // thread-owned reader thread. See Connection#processNIOBuffer return true; } @@ -267,7 +256,7 @@ public abstract class DistributedCacheOperation { DistributedRegion region = getRegion(); long viewVersion = -1; try { - if (this.containsRegionContentChange()) { + if (containsRegionContentChange()) { viewVersion = region.getDistributionAdvisor().startOperation(); } if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) { @@ -354,13 +343,13 @@ public abstract class DistributedCacheOperation { } // some members requiring old value are also in the cache op recipients set - Set needsOldValueInCacheOp = Collections.emptySet(); + Set<InternalDistributedMember> needsOldValueInCacheOp = Collections.emptySet(); // set client routing information into the event boolean routingComputed = false; FilterRoutingInfo filterRouting = null; // recipients that will get a cacheop msg and also a PR message - Set twoMessages = Collections.emptySet(); + Set<InternalDistributedMember> twoMessages = Collections.emptySet(); if (region.isUsedForPartitionedRegionBucket()) { twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages(); routingComputed = true; @@ -374,12 +363,12 @@ public abstract class DistributedCacheOperation { // some members need PR notification of the change for client/wan // notification - Set adjunctRecipients = Collections.emptySet(); + Set<InternalDistributedMember> adjunctRecipients = Collections.emptySet(); // Partitioned region listener notification messages piggyback on this // operation's replyprocessor and need to be sent at the same time as // the operation's message - if (this.supportsAdjunctMessaging() && region.isUsedForPartitionedRegionBucket()) { + if (supportsAdjunctMessaging() && region.isUsedForPartitionedRegionBucket()) { BucketRegion br = (BucketRegion) region; adjunctRecipients = getAdjunctReceivers(br, recipients, twoMessages, filterRouting); } @@ -388,7 +377,7 @@ public abstract class DistributedCacheOperation { if (entryEvent != null && entryEvent.hasOldValue()) { if (testSendingOldValues) { - needsOldValueInCacheOp = new HashSet(recipients); + needsOldValueInCacheOp = new HashSet<>(recipients); } else { needsOldValueInCacheOp = region.getCacheDistributionAdvisor().adviseRequiresOldValueInCacheOp(); @@ -396,14 +385,14 @@ public abstract class DistributedCacheOperation { recipients.removeAll(needsOldValueInCacheOp); } - Set cachelessNodes = Collections.emptySet(); - Set adviseCacheServers; + Set<InternalDistributedMember> cachelessNodes = Collections.emptySet(); + Set<InternalDistributedMember> adviseCacheServers; Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = Collections.emptySet(); - if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) { + if (region.getDistributionConfig().getDeltaPropagation() && supportsDeltaPropagation()) { cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys(); if (!cachelessNodes.isEmpty()) { - List list = new ArrayList(cachelessNodes); - for (Object member : cachelessNodes) { + List<InternalDistributedMember> list = new ArrayList<>(cachelessNodes); + for (InternalDistributedMember member : cachelessNodes) { if (!recipients.contains(member) || adjunctRecipients.contains(member)) { // Don't include those originally excluded. list.remove(member); @@ -437,26 +426,24 @@ public abstract class DistributedCacheOperation { } } } - if (!reliableOp || region.isNoDistributionOk()) { - // nothing needs be done in this case - } else { + if (reliableOp && !region.isNoDistributionOk()) { region.handleReliableDistribution(Collections.emptySet()); } // compute local client routing before waiting for an ack only for a bucket if (region.isUsedForPartitionedRegionBucket()) { FilterInfo filterInfo = getLocalFilterRouting(filterRouting); - this.event.setLocalFilterInfo(filterInfo); + event.setLocalFilterInfo(filterInfo); } } else { boolean directAck = false; boolean useMulticast = region.getMulticastEnabled() - && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast(); + && region.getSystem().getConfig().getMcastPort() != 0 && supportsMulticast(); boolean shouldAck = shouldAck(); if (shouldAck) { - if (this.supportsDirectAck() && adjunctRecipients.isEmpty()) { + if (supportsDirectAck() && adjunctRecipients.isEmpty()) { if (region.getSystem().threadOwnsResources()) { directAck = true; } @@ -481,28 +468,25 @@ public abstract class DistributedCacheOperation { if (shouldAck) { // adjunct messages are sent using the same reply processor, so // add them to the processor's membership set - Collection waitForMembers = null; - if (recipients.size() > 0 && adjunctRecipients.size() == 0 && cachelessNodes.isEmpty()) { // the - // common - // case + final Collection<InternalDistributedMember> waitForMembers; + if (recipients.size() > 0 && adjunctRecipients.size() == 0 && cachelessNodes.isEmpty()) { + // the common case waitForMembers = recipients; } else if (!cachelessNodes.isEmpty()) { - waitForMembers = new HashSet(recipients); + waitForMembers = new HashSet<>(recipients); waitForMembers.addAll(cachelessNodes); } else { - // note that we use a Vector instead of a Set for the responders - // collection - // because partitioned regions sometimes send both a regular cache - // operation and a partitioned-region notification message to the - // same recipient - waitForMembers = new Vector(recipients); + // note that we use a List instead of a Set for the responders + // collection because partitioned regions sometimes send both a regular cache + // operation and a partitioned-region notification message to the same recipient + waitForMembers = new ArrayList<>(recipients); waitForMembers.addAll(adjunctRecipients); waitForMembers.addAll(needsOldValueInCacheOp); waitForMembers.addAll(cachelessNodes); } if (DistributedCacheOperation.LOSS_SIMULATION_RATIO != 0.0) { if (LOSS_SIMULATION_GENERATOR == null) { - LOSS_SIMULATION_GENERATOR = new Random(this.hashCode()); + LOSS_SIMULATION_GENERATOR = new Random(hashCode()); } if ((LOSS_SIMULATION_GENERATOR.nextInt(100) * 1.0 / 100.0) < LOSS_SIMULATION_RATIO) { if (logger.isDebugEnabled()) { @@ -514,24 +498,23 @@ public abstract class DistributedCacheOperation { } } if (reliableOp) { - this.departedMembers = new HashSet(); - this.processor = new ReliableCacheReplyProcessor(region.getSystem(), waitForMembers, - this.departedMembers); + departedMembers = new HashSet<>(); + processor = new ReliableCacheReplyProcessor(region.getSystem(), waitForMembers, + departedMembers); } else { - this.processor = new CacheOperationReplyProcessor(region.getSystem(), waitForMembers); + processor = new CacheOperationReplyProcessor(region.getSystem(), waitForMembers); } } - Set failures = null; CacheOperationMessage msg = createMessage(); - initMessage(msg, this.processor); + initMessage(msg, processor); if (DistributedCacheOperation.internalBeforePutOutgoing != null) { DistributedCacheOperation.internalBeforePutOutgoing.run(); } if (processor != null && msg.isSevereAlertCompatible()) { - this.processor.enableSevereAlertProcessing(); + processor.enableSevereAlertProcessing(); // if this message is distributing for a partitioned region message, // we can't wait as long as the full ack-severe-alert-threshold or // the sender might kick us out of the system before we can get an ack @@ -570,14 +553,14 @@ public abstract class DistributedCacheOperation { } msg.setRecipients(recipients); - failures = mgr.putOutgoing(msg); + Set<InternalDistributedMember> failures = mgr.putOutgoing(msg); // distribute to members needing the old value now if (needsOldValueInCacheOp.size() > 0) { - msg.appendOldValueToMessage((EntryEventImpl) this.event); + msg.appendOldValueToMessage((EntryEventImpl) event); msg.resetRecipients(); msg.setRecipients(needsOldValueInCacheOp); - Set newFailures = mgr.putOutgoing(msg); + Set<InternalDistributedMember> newFailures = mgr.putOutgoing(msg); if (newFailures != null) { if (logger.isDebugEnabled()) { logger.debug("Failed sending ({}) to {}", msg, newFailures); @@ -596,7 +579,7 @@ public abstract class DistributedCacheOperation { msg.resetRecipients(); msg.setRecipients(cachelessNodes); msg.setSendDelta(false); - Set newFailures = mgr.putOutgoing(msg); + Set<InternalDistributedMember> newFailures = mgr.putOutgoing(msg); if (newFailures != null) { if (failures != null && failures.size() > 0) { failures.addAll(newFailures); @@ -611,7 +594,7 @@ public abstract class DistributedCacheOperation { msg.setRecipients(cachelessNodesWithNoCacheServer); msg.setSendDelta(false); ((UpdateMessage) msg).setSendDeltaWithFullValue(false); - Set newFailures = mgr.putOutgoing(msg); + Set<InternalDistributedMember> newFailures = mgr.putOutgoing(msg); if (newFailures != null) { if (failures != null && failures.size() > 0) { failures.addAll(newFailures); @@ -628,7 +611,6 @@ public abstract class DistributedCacheOperation { logger.debug("Failed sending ({}) to {} while processing event:{}", msg, failures, event); } - Set<InternalDistributedMember> adjunctRecipientsWithNoCacheServer = Collections.emptySet(); // send partitioned region listener notification messages now if (!adjunctRecipients.isEmpty()) { if (cachelessNodes.size() > 0) { @@ -640,29 +622,31 @@ public abstract class DistributedCacheOperation { recipients.addAll(cachelessNodes); } } - adjunctRecipientsWithNoCacheServer = new HashSet<>(adjunctRecipients); + + final Set<InternalDistributedMember> adjunctRecipientsWithNoCacheServer = + new HashSet<>(adjunctRecipients); adviseCacheServers = ((Bucket) region).getPartitionedRegion() .getCacheDistributionAdvisor().adviseCacheServers(); adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers); if (isPutAll) { ((BucketRegion) region).performPutAllAdjunctMessaging((DistributedPutAllOperation) this, - recipients, adjunctRecipients, filterRouting, this.processor); + recipients, adjunctRecipients, filterRouting, processor); } else if (isRemoveAll) { ((BucketRegion) region).performRemoveAllAdjunctMessaging( (DistributedRemoveAllOperation) this, recipients, adjunctRecipients, filterRouting, - this.processor); + processor); } else { boolean calculateDelta = adjunctRecipientsWithNoCacheServer.size() < adjunctRecipients.size(); adjunctRecipients.removeAll(adjunctRecipientsWithNoCacheServer); if (!adjunctRecipients.isEmpty()) { ((BucketRegion) region).performAdjunctMessaging(getEvent(), recipients, - adjunctRecipients, filterRouting, this.processor, calculateDelta, true); + adjunctRecipients, filterRouting, processor, calculateDelta, true); } if (!adjunctRecipientsWithNoCacheServer.isEmpty()) { ((BucketRegion) region).performAdjunctMessaging(getEvent(), recipients, - adjunctRecipientsWithNoCacheServer, filterRouting, this.processor, calculateDelta, + adjunctRecipientsWithNoCacheServer, filterRouting, processor, calculateDelta, false); } } @@ -674,10 +658,10 @@ public abstract class DistributedCacheOperation { event.setLocalFilterInfo(filterInfo); } - waitForAckIfNeeded(msg, persistentIds); + waitForAckIfNeeded(persistentIds); if (/* msg != null && */reliableOp) { - Set successfulRecips = new HashSet(recipients); + Set<InternalDistributedMember> successfulRecips = new HashSet<>(recipients); successfulRecips.addAll(cachelessNodes); successfulRecips.addAll(needsOldValueInCacheOp); if (failures != null && !failures.isEmpty()) { @@ -747,8 +731,8 @@ public abstract class DistributedCacheOperation { // the entry form CQ cache. if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID) && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY) - && ((EntryOperation) event).getKey() != null) { - cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true); + && ((EntryOperation<?, ?>) event).getKey() != null) { + cq.removeFromCqResultKeys(((EntryOperation<?, ?>) event).getKey(), true); } } } @@ -762,9 +746,10 @@ public abstract class DistributedCacheOperation { * messages * @param routing client routing information */ - Set getAdjunctReceivers(BucketRegion br, Set cacheOpReceivers, Set twoMessages, + Set<InternalDistributedMember> getAdjunctReceivers(BucketRegion br, + Set<InternalDistributedMember> cacheOpReceivers, Set<InternalDistributedMember> twoMessages, FilterRoutingInfo routing) { - return br.getAdjunctReceivers(this.getEvent(), cacheOpReceivers, twoMessages, routing); + return br.getAdjunctReceivers(getEvent(), cacheOpReceivers, twoMessages, routing); } /** @@ -774,16 +759,16 @@ public abstract class DistributedCacheOperation { // nothing to do here - see UpdateMessage } - protected void waitForAckIfNeeded(CacheOperationMessage msg, + protected void waitForAckIfNeeded( Map<InternalDistributedMember, PersistentMemberID> persistentIds) { - if (this.processor == null) { + if (processor == null) { return; } try { // keep waiting even if interrupted try { - this.processor.waitForRepliesUninterruptibly(); - Set<InternalDistributedMember> closedMembers = this.processor.closedMembers.getSnapshot(); + processor.waitForRepliesUninterruptibly(); + Set<InternalDistributedMember> closedMembers = processor.closedMembers.getSnapshot(); handleClosedMembers(closedMembers, persistentIds); } catch (ReplyException e) { if (this instanceof DestroyRegionOperation) { @@ -792,7 +777,7 @@ public abstract class DistributedCacheOperation { e.handleCause(); } } finally { - this.processor = null; + processor = null; } } @@ -828,27 +813,28 @@ public abstract class DistributedCacheOperation { } protected DistributedRegion getRegion() { - return (DistributedRegion) this.event.getRegion(); + return (DistributedRegion) event.getRegion(); } protected EntryEventImpl getEvent() { - return (EntryEventImpl) this.event; + return (EntryEventImpl) event; } - protected Set getRecipients() { + protected Set<InternalDistributedMember> getRecipients() { CacheDistributionAdvisor advisor = getRegion().getCacheDistributionAdvisor(); - this.originalRecipients = advisor.adviseCacheOp(); - return this.originalRecipients; + originalRecipients = advisor.adviseCacheOp(); + return originalRecipients; } - protected FilterRoutingInfo getRecipientFilterRouting(Set cacheOpRecipients) { + protected FilterRoutingInfo getRecipientFilterRouting( + Set<InternalDistributedMember> cacheOpRecipients) { LocalRegion region = getRegion(); if (!region.isUsedForPartitionedRegionBucket()) { return null; } CacheDistributionAdvisor advisor; advisor = region.getPartitionedRegion().getCacheDistributionAdvisor(); - return advisor.adviseFilterRouting(this.event, cacheOpRecipients); + return advisor.adviseFilterRouting(event, cacheOpRecipients); } /** @@ -860,7 +846,7 @@ public abstract class DistributedCacheOperation { if (fp == null) { return null; } - FilterRoutingInfo fri = fp.getFilterRoutingInfoPart2(frInfo, this.event); + FilterRoutingInfo fri = fp.getFilterRoutingInfoPart2(frInfo, event); if (fri == null) { return null; } @@ -873,7 +859,7 @@ public abstract class DistributedCacheOperation { msg.regionPath = getRegion().getFullPath(); msg.processorId = p == null ? 0 : p.getProcessorId(); msg.processor = p; - if (this.event.getOperation().isEntry()) { + if (event.getOperation().isEntry()) { EntryEventImpl entryEvent = getEvent(); msg.callbackArg = entryEvent.getRawCallbackArgument(); msg.possibleDuplicate = entryEvent.isPossibleDuplicate(); @@ -885,9 +871,9 @@ public abstract class DistributedCacheOperation { } } else { - msg.callbackArg = ((RegionEventImpl) this.event).getRawCallbackArgument(); + msg.callbackArg = ((RegionEventImpl) event).getRawCallbackArgument(); } - msg.op = this.event.getOperation(); + msg.op = event.getOperation(); msg.owner = this; msg.regionAllowsConflation = getRegion().getEnableAsyncConflation(); @@ -896,7 +882,7 @@ public abstract class DistributedCacheOperation { @Override public String toString() { String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1); - return cname + "(" + this.event + ")"; + return cname + "(" + event + ")"; } /** @@ -962,16 +948,16 @@ public abstract class DistributedCacheOperation { protected boolean inhibitAllNotifications; public Operation getOperation() { - return this.op; + return op; } /** sets the concurrency versioning tag for this message */ public void setVersionTag(VersionTag tag) { - this.versionTag = tag; + versionTag = tag; } public VersionTag getVersionTag() { - return this.versionTag; + return versionTag; } @Override @@ -988,21 +974,21 @@ public abstract class DistributedCacheOperation { @Override public void registerProcessor() { if (processor != null) { - this.processorId = this.processor.register(); + processorId = processor.register(); } - this.directAck = false; + directAck = false; } public void setFilterInfo(FilterRoutingInfo fInfo) { - this.filterRouting = fInfo; + filterRouting = fInfo; } public void setInhibitNotificationsBit(boolean inhibit) { - this.inhibitAllNotifications = inhibit; + inhibitAllNotifications = inhibit; } public String getRegionPath() { - return this.regionPath; + return regionPath; } /** @@ -1043,11 +1029,11 @@ public abstract class DistributedCacheOperation { CqService cqService = event.getRegion().getCache().getCqService(); if (cqService.isRunning()/* || event.getOperation().guaranteesOldValue() */) { event.setOldValueForQueryProcessing(); - if (!event.hasOldValue() && this.hasOldValue) { - if (this.oldValueIsSerialized) { - event.setSerializedOldValue((byte[]) this.oldValue); + if (!event.hasOldValue() && hasOldValue) { + if (oldValueIsSerialized) { + event.setSerializedOldValue((byte[]) oldValue); } else { - event.setOldValue(this.oldValue); + event.setOldValue(oldValue); } } } @@ -1059,15 +1045,15 @@ public abstract class DistributedCacheOperation { * @since GemFire 6.1 */ protected void setHasDelta(boolean flag) { - this.hasDelta = flag; + hasDelta = flag; } protected boolean hasDelta() { - return this.hasDelta; + return hasDelta; } public FilterRoutingInfo getFilterInfo() { - return this.filterRouting; + return filterRouting; } /** @@ -1079,7 +1065,7 @@ public abstract class DistributedCacheOperation { @Override public int getProcessorId() { - return this.processorId; + return processorId; } @Override @@ -1088,9 +1074,9 @@ public abstract class DistributedCacheOperation { } protected LocalRegion getLocalRegionForProcessing(ClusterDistributionManager dm) { - Assert.assertTrue(this.regionPath != null, "regionPath was null"); + Assert.assertTrue(regionPath != null, "regionPath was null"); InternalCache gfc = dm.getExistingCache(); - return (LocalRegion) gfc.getRegionByPathForProcessing(this.regionPath); + return (LocalRegion) gfc.getRegionByPathForProcessing(regionPath); } @Override @@ -1098,11 +1084,11 @@ public abstract class DistributedCacheOperation { Throwable thr = null; boolean sendReply = true; - if (this.versionTag != null) { - this.versionTag.replaceNullIDs(getSender()); + if (versionTag != null) { + versionTag.replaceNullIDs(getSender()); } - EntryLogger.setSource(this.getSender(), "p2p"); + EntryLogger.setSource(getSender(), "p2p"); final InitializationLevel oldLevel = LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE); try { @@ -1115,7 +1101,7 @@ public abstract class DistributedCacheOperation { sendReply = false; basicProcess(dm, lclRgn); } catch (CancelException ignore) { - this.closed = true; + closed = true; if (logger.isDebugEnabled()) { logger.debug("{} Cancelled: nothing to do", this); } @@ -1158,10 +1144,8 @@ public abstract class DistributedCacheOperation { logger.trace("DistributedCacheOperation.basicProcess: {}", this); } try { - // LocalRegion lclRgn = getRegionFromPath(dm.getSystem(), - // this.regionPath); if (lclRgn == null) { - this.closed = true; + closed = true; if (logger.isDebugEnabled()) { logger.debug("{} region not found, nothing to do", this); } @@ -1169,8 +1153,6 @@ public abstract class DistributedCacheOperation { } // Could this cause a deadlock, because this can block a P2P reader // thread which might be needed to read the create region reply?? - // DAN - I don't think this does anything because process called - // LocalRegion.setThreadInitLevelRequirement lclRgn.waitOnInitialization(); // In some subclasses, lclRgn may be destroyed, so be careful not to // allow a RegionDestroyedException to be thrown on lclRgn access @@ -1193,7 +1175,7 @@ public abstract class DistributedCacheOperation { try { boolean isEntry = event.getOperation().isEntry(); - if (isEntry && this.possibleDuplicate) { + if (isEntry && possibleDuplicate) { ((EntryEventImpl) event).setPossibleDuplicate(true); // If the state of the initial image yet to be received is unknown, // we must not apply the event. It may already be reflected in the @@ -1211,29 +1193,29 @@ public abstract class DistributedCacheOperation { } } - sendReply = operateOnRegion(event, dm) && sendReply; + sendReply = operateOnRegion(event, dm); } finally { if (event instanceof EntryEventImpl) { ((Releasable) event).release(); } } } catch (RegionDestroyedException ignore) { - this.closed = true; + closed = true; if (logger.isDebugEnabled()) { logger.debug("{} Region destroyed: nothing to do", this); } } catch (CancelException ignore) { - this.closed = true; + closed = true; if (logger.isDebugEnabled()) { logger.debug("{} Cancelled: nothing to do", this); } } catch (DiskAccessException e) { - this.closed = true; + closed = true; if (!lclRgn.isDestroyed()) { logger.error("Got disk access exception, expected region to be destroyed", e); } } catch (EntryNotFoundException ignore) { - this.appliedOperation = true; + appliedOperation = true; if (logger.isDebugEnabled()) { logger.debug("{} Entry not found, nothing to do", this); } @@ -1256,7 +1238,7 @@ public abstract class DistributedCacheOperation { SystemFailure.checkFailure(); thr = t; } finally { - checkVersionIsRecorded(this.versionTag, lclRgn); + checkVersionIsRecorded(versionTag, lclRgn); if (sendReply) { ReplyException rex = null; if (thr != null) { @@ -1273,10 +1255,9 @@ public abstract class DistributedCacheOperation { public void sendReply(InternalDistributedMember recipient, int pId, ReplyException rex, ReplySender dm) { - if (pId == 0 && (dm instanceof DistributionManager) && !this.directAck) {// Fix for #41871 - // distributed-no-ack message. Don't respond - } else { - ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false, + // Don't respond to distributed-no-ack message. + if (pId != 0 || (!(dm instanceof DistributionManager)) || directAck) { + ReplyMessage.send(recipient, pId, rex, dm, !appliedOperation, closed, false, isInternal()); } } @@ -1325,7 +1306,8 @@ public abstract class DistributedCacheOperation { protected abstract InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException; - protected abstract boolean operateOnRegion(CacheEvent event, ClusterDistributionManager dm) + protected abstract boolean operateOnRegion(CacheEvent<?, ?> event, + ClusterDistributionManager dm) throws EntryNotFoundException; @Override @@ -1333,7 +1315,7 @@ public abstract class DistributedCacheOperation { StringBuilder buff = new StringBuilder(); buff.append(getShortClassName()); buff.append("(region path='"); // make sure this is the first one - buff.append(this.regionPath); + buff.append(regionPath); buff.append("'"); appendFields(buff); buff.append(")"); @@ -1345,28 +1327,28 @@ public abstract class DistributedCacheOperation { buff.append("; sender="); buff.append(getSender()); buff.append("; callbackArg="); - buff.append(this.callbackArg); + buff.append(callbackArg); buff.append("; processorId="); - buff.append(this.processorId); + buff.append(processorId); buff.append("; op="); - buff.append(this.op); + buff.append(op); buff.append("; applied="); - buff.append(this.appliedOperation); + buff.append(appliedOperation); buff.append("; directAck="); - buff.append(this.directAck); + buff.append(directAck); buff.append("; posdup="); - buff.append(this.possibleDuplicate); + buff.append(possibleDuplicate); buff.append("; hasDelta="); - buff.append(this.hasDelta); + buff.append(hasDelta); buff.append("; hasOldValue="); - buff.append(this.hasOldValue); - if (this.versionTag != null) { + buff.append(hasOldValue); + if (versionTag != null) { buff.append("; version="); - buff.append(this.versionTag); + buff.append(versionTag); } - if (this.filterRouting != null) { + if (filterRouting != null) { buff.append(" "); - buff.append(this.filterRouting.toString()); + buff.append(filterRouting); } } @@ -1375,42 +1357,42 @@ public abstract class DistributedCacheOperation { DeserializationContext context) throws IOException, ClassNotFoundException { short bits = in.readShort(); short extBits = in.readShort(); - this.flags = bits; + flags = bits; setFlags(bits, in); - this.regionPath = DataSerializer.readString(in); - this.op = Operation.fromOrdinal(in.readByte()); + regionPath = DataSerializer.readString(in); + op = Operation.fromOrdinal(in.readByte()); // TODO dirack There's really no reason to send this flag across the wire // anymore - this.directAck = (bits & DIRECT_ACK_MASK) != 0; - this.possibleDuplicate = (bits & POSSIBLE_DUPLICATE_MASK) != 0; + directAck = (bits & DIRECT_ACK_MASK) != 0; + possibleDuplicate = (bits & POSSIBLE_DUPLICATE_MASK) != 0; if ((bits & CALLBACK_ARG_MASK) != 0) { - this.callbackArg = DataSerializer.readObject(in); + callbackArg = DataSerializer.readObject(in); } - this.hasDelta = (bits & DELTA_MASK) != 0; - this.hasOldValue = (bits & OLD_VALUE_MASK) != 0; - if (this.hasOldValue) { + hasDelta = (bits & DELTA_MASK) != 0; + hasOldValue = (bits & OLD_VALUE_MASK) != 0; + if (hasOldValue) { byte b = in.readByte(); if (b == 0) { - this.oldValueIsSerialized = false; + oldValueIsSerialized = false; } else if (b == 1) { - this.oldValueIsSerialized = true; + oldValueIsSerialized = true; } else { throw new IllegalStateException("expected 0 or 1"); } - this.oldValue = DataSerializer.readByteArray(in); + oldValue = DataSerializer.readByteArray(in); } boolean hasFilterInfo = (bits & FILTER_INFO_MASK) != 0; - this.needsRouting = (bits & NEEDS_ROUTING_MASK) != 0; + needsRouting = (bits & NEEDS_ROUTING_MASK) != 0; if (hasFilterInfo) { - this.filterRouting = new FilterRoutingInfo(); - InternalDataSerializer.invokeFromData(this.filterRouting, in); + filterRouting = new FilterRoutingInfo(); + InternalDataSerializer.invokeFromData(filterRouting, in); } if ((bits & VERSION_TAG_MASK) != 0) { boolean persistentTag = (bits & PERSISTENT_TAG_MASK) != 0; - this.versionTag = VersionTag.create(persistentTag, in); + versionTag = VersionTag.create(persistentTag, in); } if ((extBits & INHIBIT_NOTIFICATIONS_MASK) != 0) { - this.inhibitAllNotifications = true; + inhibitAllNotifications = true; } } @@ -1423,68 +1405,68 @@ public abstract class DistributedCacheOperation { extendedBits = computeCompressedExtBits(extendedBits); out.writeShort(bits); out.writeShort(extendedBits); - if (this.processorId > 0) { - out.writeInt(this.processorId); + if (processorId > 0) { + out.writeInt(processorId); } - DataSerializer.writeString(this.regionPath, out); - out.writeByte(this.op.ordinal); - if (this.callbackArg != null) { - DataSerializer.writeObject(this.callbackArg, out); + DataSerializer.writeString(regionPath, out); + out.writeByte(op.ordinal); + if (callbackArg != null) { + DataSerializer.writeObject(callbackArg, out); } - if (this.hasOldValue) { - out.writeByte(this.oldValueIsSerialized ? 1 : 0); + if (hasOldValue) { + out.writeByte(oldValueIsSerialized ? 1 : 0); // the receiving side expects that the old value will have been serialized // as a byte array - final byte policy = valueIsToDeserializationPolicy(this.oldValueIsSerialized); + final byte policy = valueIsToDeserializationPolicy(oldValueIsSerialized); final Object vObj; final byte[] vBytes; - if (!this.oldValueIsSerialized && this.oldValue instanceof byte[]) { + if (!oldValueIsSerialized && oldValue instanceof byte[]) { vObj = null; - vBytes = (byte[]) this.oldValue; + vBytes = (byte[]) oldValue; } else { - vObj = this.oldValue; + vObj = oldValue; vBytes = null; } writeValue(policy, vObj, vBytes, out); } - if (this.filterRouting != null) { - InternalDataSerializer.invokeToData(this.filterRouting, out); + if (filterRouting != null) { + InternalDataSerializer.invokeToData(filterRouting, out); } - if (this.versionTag != null) { - InternalDataSerializer.invokeToData(this.versionTag, out); + if (versionTag != null) { + InternalDataSerializer.invokeToData(versionTag, out); } } protected short computeCompressedShort(short bits) { - if (this.hasOldValue) { + if (hasOldValue) { bits |= OLD_VALUE_MASK; } - if (this.directAck) { + if (directAck) { bits |= DIRECT_ACK_MASK; } - if (this.possibleDuplicate) { + if (possibleDuplicate) { bits |= POSSIBLE_DUPLICATE_MASK; } - if (this.processorId != 0) { + if (processorId != 0) { bits |= HAS_PROCESSOR_ID; } - if (this.callbackArg != null) { + if (callbackArg != null) { bits |= CALLBACK_ARG_MASK; } - if (this.hasDelta) { + if (hasDelta) { bits |= DELTA_MASK; } - if (this.filterRouting != null) { + if (filterRouting != null) { bits |= FILTER_INFO_MASK; } - if (this.needsRouting) { + if (needsRouting) { bits |= NEEDS_ROUTING_MASK; } - if (this.versionTag != null) { + if (versionTag != null) { bits |= VERSION_TAG_MASK; } - if (this.versionTag instanceof DiskVersionTag) { + if (versionTag instanceof DiskVersionTag) { bits |= PERSISTENT_TAG_MASK; } if (inhibitAllNotifications) { @@ -1502,14 +1484,14 @@ public abstract class DistributedCacheOperation { protected void setFlags(short bits, DataInput in) throws IOException, ClassNotFoundException { if ((bits & HAS_PROCESSOR_ID) != 0) { - this.processorId = in.readInt(); - ReplyProcessor21.setMessageRPId(this.processorId); + processorId = in.readInt(); + ReplyProcessor21.setMessageRPId(processorId); } } @Override public boolean supportsDirectAck() { - return this.directAck; + return directAck; } public void setSendDelta(boolean sendDelta) { @@ -1533,16 +1515,16 @@ public abstract class DistributedCacheOperation { @Override public void importOldObject(Object ov, boolean isSerialized) { - this.oldValueIsSerialized = isSerialized; - this.oldValue = ov; - this.hasOldValue = true; + oldValueIsSerialized = isSerialized; + oldValue = ov; + hasOldValue = true; } @Override public void importOldBytes(byte[] ov, boolean isSerialized) { - this.oldValueIsSerialized = isSerialized; - this.oldValue = ov; - this.hasOldValue = true; + oldValueIsSerialized = isSerialized; + oldValue = ov; + hasOldValue = true; } protected boolean notifiesSerialGatewaySender(ClusterDistributionManager dm) { @@ -1564,26 +1546,22 @@ public abstract class DistributedCacheOperation { /** Custom subclass that keeps all ReplyExceptions */ private static class ReliableCacheReplyProcessor extends CacheOperationReplyProcessor { - private final Set failedMembers; + private final Set<InternalDistributedMember> failedMembers; - private final DistributionManager dm; - - public ReliableCacheReplyProcessor(InternalDistributedSystem system, Collection initMembers, - Set departedMembers) { + public ReliableCacheReplyProcessor(InternalDistributedSystem system, + Collection<InternalDistributedMember> initMembers, + Set<InternalDistributedMember> departedMembers) { super(system, initMembers); - this.dm = system.getDistributionManager(); - this.failedMembers = departedMembers; + failedMembers = departedMembers; } @Override protected synchronized void processException(DistributionMessage dmsg, ReplyException ex) { Throwable cause = ex.getCause(); // only interested in CacheClosedException and RegionDestroyedException - if (cause instanceof CancelException || cause instanceof RegionDestroyedException) { - this.failedMembers.add(dmsg.getSender()); - } else { + failedMembers.add(dmsg.getSender()); + if (!(cause instanceof CancelException) && !(cause instanceof RegionDestroyedException)) { // allow superclass to handle all other exceptions - this.failedMembers.add(dmsg.getSender()); super.processException(dmsg, ex); } } @@ -1594,7 +1572,7 @@ public abstract class DistributedCacheOperation { if (logger.isDebugEnabled()) { logger.debug("{} replied with ignored true", dmsg.getSender()); } - this.failedMembers.add(dmsg.getSender()); + failedMembers.add(dmsg.getSender()); } super.process(dmsg, warn); } @@ -1605,7 +1583,8 @@ public abstract class DistributedCacheOperation { public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>(); - public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) { + public CacheOperationReplyProcessor(InternalDistributedSystem system, + Collection<InternalDistributedMember> initMembers) { super(system, initMembers); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java index 606fac6..ff4bb2a 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java @@ -41,13 +41,14 @@ public class DistributedCacheOperationTest { @Test public void shouldBeMockable() throws Exception { DistributedCacheOperation mockDistributedCacheOperation = mock(DistributedCacheOperation.class); + @SuppressWarnings("unused") // forces CacheOperationMessage to be mockable CacheOperationMessage mockCacheOperationMessage = mock(CacheOperationMessage.class); Map<InternalDistributedMember, PersistentMemberID> persistentIds = new HashMap<>(); when(mockDistributedCacheOperation.supportsDirectAck()).thenReturn(false); - mockDistributedCacheOperation.waitForAckIfNeeded(mockCacheOperationMessage, persistentIds); + mockDistributedCacheOperation.waitForAckIfNeeded(persistentIds); - verify(mockDistributedCacheOperation, times(1)).waitForAckIfNeeded(mockCacheOperationMessage, + verify(mockDistributedCacheOperation, times(1)).waitForAckIfNeeded( persistentIds); assertThat(mockDistributedCacheOperation.supportsDirectAck()).isFalse(); @@ -80,7 +81,7 @@ public class DistributedCacheOperationTest { boolean endOperationInvoked; DistributedRegion region; - public TestOperation(CacheEvent event) { + public TestOperation(CacheEvent<?, ?> event) { super(event); } @@ -114,7 +115,7 @@ public class DistributedCacheOperationTest { @Test public void testDoRemoveDestroyTokensFromCqResultKeys() { Object key = new Object(); - HashMap hashMap = new HashMap(); + HashMap<Long, Integer> hashMap = new HashMap<>(); hashMap.put(1L, MessageType.LOCAL_DESTROY); EntryEventImpl baseEvent = mock(EntryEventImpl.class); ServerCQ serverCQ = mock(ServerCQ.class); @@ -123,7 +124,7 @@ public class DistributedCacheOperationTest { new DestroyOperation(baseEvent); when(baseEvent.getKey()).thenReturn(key); when(filterInfo.getCQs()).thenReturn(hashMap); - when(serverCQ.getFilterID()).thenReturn(new Long(1L)); + when(serverCQ.getFilterID()).thenReturn(1L); doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class)); distributedCacheOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);