This is an automated email from the ASF dual-hosted git repository. echobravo pushed a commit to branch revert-4086-feature/GEODE-6807 in repository https://gitbox.apache.org/repos/asf/geode.git
commit a7a5ecdde06ee38c9a6d98e888a65084c283e8c6 Author: Ernie Burghardt <eburgha...@pivotal.io> AuthorDate: Mon Oct 21 09:33:10 2019 -0700 Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached" --- .../CacheDistributionAdvisorConcurrentTest.java | 105 ------------ .../distributed/internal/DistributionAdvisor.java | 16 +- .../internal/cache/CacheDistributionAdvisor.java | 98 +++--------- .../internal/cache/DistributedCacheOperation.java | 2 +- .../geode/internal/cache/DistributedRegion.java | 7 +- .../cache/CacheDistributionAdvisorTest.java | 176 --------------------- 6 files changed, 30 insertions(+), 374 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java deleted file mode 100644 index 9afbc52..0000000 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - - -import static org.apache.geode.test.concurrency.Utilities.availableProcessors; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.apache.geode.CancelCriterion; -import org.apache.geode.cache.Operation; -import org.apache.geode.distributed.internal.DistributionAdvisor; -import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.distributed.internal.membership.MemberAttributes; -import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; -import org.apache.geode.test.concurrency.ConcurrentTestRunner; -import org.apache.geode.test.concurrency.ParallelExecutor; - - -@RunWith(ConcurrentTestRunner.class) -public class CacheDistributionAdvisorConcurrentTest { - private final int count = availableProcessors() * 2; - - @Test - public void getAdviseAllEventsOrCachedForConcurrentUpdateShouldSucceed( - ParallelExecutor executor) throws Exception { - - DistributionAdvisor advisor = createCacheDistributionAdvisor(); - CacheProfile profile = createCacheProfile(); - advisor.putProfile(profile, true); - - executor.inParallel(() -> { - ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); - }, count); - executor.execute(); - - assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached() - .contains(profile.getDistributedMember())); - assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1); - - } - - @Test - public void getAdviseUpdateForConcurrentUpdateShouldSucceed( - ParallelExecutor executor) throws Exception { - - EntryEventImpl event = new EntryEventImpl(); - event.setNewValue(null); - event.setOperation(Operation.CREATE); - - DistributionAdvisor advisor = createCacheDistributionAdvisor(); - CacheProfile profile = createCacheProfile(); - advisor.putProfile(profile, true); - - executor.inParallel(() -> { - ((CacheDistributionAdvisor) advisor).adviseUpdate(event); - }, count); - executor.execute(); - - assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached() - .contains(profile.getDistributedMember())); - assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1); - - } - - private DistributionAdvisor createCacheDistributionAdvisor() { - CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class); - CancelCriterion cancelCriterion = mock(CancelCriterion.class); - when(advisee.getCancelCriterion()).thenReturn(cancelCriterion); - DistributionManager distributionManager = mock(DistributionManager.class); - when(advisee.getDistributionManager()).thenReturn(distributionManager); - CacheDistributionAdvisor result = - CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee); - when(advisee.getDistributionAdvisor()).thenReturn(result); - return result; - } - - private CacheProfile createCacheProfile() throws UnknownHostException { - InternalDistributedMember member = - new InternalDistributedMember(InetAddress.getLocalHost(), 0, false, - false, MemberAttributes.DEFAULT); - return new CacheProfile(member, 1); - } -} diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java index a2e701a..0a33f51 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java @@ -171,12 +171,6 @@ public class DistributionAdvisor { private int numActiveProfiles = 0; /** - * Profiles version number - */ - protected volatile long profilesVersion = 0; - - - /** * A collection of MembershipListeners that want to be notified when a profile is added to or * removed from this DistributionAdvisor. The keys are membership listeners and the values are * Boolean.TRUE. @@ -1319,7 +1313,8 @@ public class DistributionAdvisor { // must synchronize when modifying profile array private synchronized boolean basicAddProfile(Profile p) { // don't add more than once, but replace existing profile - profilesVersion++; + // try { + int index = indexOfMemberId(p.getId()); if (index >= 0) { Profile[] oldProfiles = profiles; // volatile read @@ -1345,16 +1340,17 @@ public class DistributionAdvisor { * Perform work of removing the given member from this advisor. */ private synchronized Profile basicRemoveMemberId(ProfileId id) { - + // try { int i = indexOfMemberId(id); if (i >= 0) { - profilesVersion++; Profile profileRemoved = profiles[i]; basicRemoveIndex(i); return profileRemoved; } else return null; - + // } finally { + // Assert.assertTrue(-1 == indexOfMemberId(id)); + // } } private int indexOfMemberId(ProfileId id) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java index 725cc61..1f3c0ba 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java @@ -112,19 +112,6 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { // moved removedProfiles to DistributionAdvisor - private Set<InternalDistributedMember> adviseSetforAllEvents = Collections.emptySet(); - private volatile long adviseAllEventsVersion = -1; - - private Set<InternalDistributedMember> adviseSetforUpdate = Collections.emptySet(); - private volatile long adviseUpdateVersion = -1; - - private volatile long inRecoveryVersion = 0; - private volatile long adviseInRecoveryVersion = -1; - - public synchronized void incInRecoveryVersion() { - inRecoveryVersion++; - } - /** Creates a new instance of CacheDistributionAdvisor */ protected CacheDistributionAdvisor(CacheDistributionAdvisee region) { super(region); @@ -153,35 +140,19 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { /** * Returns a the set of members that either want all events or are caching data. * + * @param excludeInRecovery if true then members in recovery are excluded */ - Set<InternalDistributedMember> adviseAllEventsOrCached() + private Set<InternalDistributedMember> adviseAllEventsOrCached(final boolean excludeInRecovery) throws IllegalStateException { getAdvisee().getCancelCriterion().checkCancelInProgress(null); - - // minimize volatile reads by copying ref to local var - long tempProfilesVersion = profilesVersion; // volatile read - long tempInRecoveryVersion = inRecoveryVersion; // volatile read - - if (adviseAllEventsVersion != tempProfilesVersion - || adviseInRecoveryVersion != tempInRecoveryVersion) { - synchronized (adviseSetforAllEvents) { - if (adviseAllEventsVersion != tempProfilesVersion - || adviseInRecoveryVersion != tempInRecoveryVersion) { - - adviseSetforAllEvents = Collections.unmodifiableSet(adviseFilter(profile -> { - CacheProfile cp = (CacheProfile) profile; - if (cp.getInRecovery()) { - return false; - } - return cp.cachedOrAllEventsWithListener(); - })); - adviseAllEventsVersion = tempProfilesVersion; - adviseInRecoveryVersion = tempInRecoveryVersion; - } + return adviseFilter(profile -> { + assert profile instanceof CacheProfile; + CacheProfile cp = (CacheProfile) profile; + if (excludeInRecovery && cp.inRecovery) { + return false; } - } - return adviseSetforAllEvents; - + return cp.cachedOrAllEventsWithListener(); + }); } /** @@ -191,30 +162,18 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException { if (event.hasNewValue() || event.getOperation().isPutAll()) { // only need to distribute it to members that want all events or cache data - return adviseAllEventsOrCached(); + return adviseAllEventsOrCached(true/* fixes 41147 */); } else { // The new value is null so this is a create with a null value, // in which case we only need to distribute this message to replicates // or all events that are not a proxy or if a proxy has a listener - - // minimize volatile reads by copying ref to local var - long tempProfilesVersion = profilesVersion; // volatile read - - if (adviseUpdateVersion != tempProfilesVersion) { - synchronized (adviseSetforUpdate) { - if (adviseUpdateVersion != tempProfilesVersion) { - - adviseSetforUpdate = Collections.unmodifiableSet(adviseFilter(profile -> { - CacheProfile cp = (CacheProfile) profile; - DataPolicy dp = cp.getDataPolicy(); - return dp.withReplication() - || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener)); - })); - adviseUpdateVersion = tempProfilesVersion; - } - } - } - return adviseSetforUpdate; + return adviseFilter(profile -> { + assert profile instanceof CacheProfile; + CacheProfile cp = (CacheProfile) profile; + DataPolicy dp = cp.dataPolicy; + return dp.withReplication() + || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener)); + }); } } @@ -291,7 +250,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { * Same as adviseGeneric except in recovery excluded. */ public Set<InternalDistributedMember> adviseCacheOp() { - return adviseAllEventsOrCached(); + return adviseAllEventsOrCached(true); } /* @@ -301,7 +260,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { return adviseFilter(profile -> { assert profile instanceof CacheProfile; CacheProfile cp = (CacheProfile) profile; - return !cp.getInRecovery(); + return !cp.inRecovery; }); } @@ -324,7 +283,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { assert profile instanceof CacheProfile; CacheProfile prof = (CacheProfile) profile; // if region in cache is in recovery, exclude - if (prof.getInRecovery()) { + if (prof.inRecovery) { return false; } @@ -405,7 +364,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { } // if region in cache is in recovery, exclude - if (profile.getInRecovery()) { + if (profile.inRecovery) { uninitialized.add(profile.getDistributedMember()); continue; } @@ -494,13 +453,12 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { */ public static class CacheProfile extends DistributionAdvisor.Profile { public DataPolicy dataPolicy = DataPolicy.REPLICATE; - public InterestPolicy interestPolicy = InterestPolicy.DEFAULT; public boolean hasCacheLoader = false; public boolean hasCacheWriter = false; public boolean hasCacheListener = false; public Scope scope = Scope.DISTRIBUTED_NO_ACK; - private boolean inRecovery = false; + public boolean inRecovery = false; public Set<String> gatewaySenderIds = Collections.emptySet(); public Set<String> asyncEventQueueIds = Collections.emptySet(); /** @@ -652,18 +610,6 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { return dataPolicy.withPersistence(); } - public boolean getInRecovery() { - return inRecovery; - }; - - public void setInRecovery(boolean recovery) { - inRecovery = recovery; - }; - - public DataPolicy getDataPolicy() { - return dataPolicy; - } - /** Set the profile data information that is stored in a short */ protected void setIntInfo(int s) { if ((s & REPLICATE_MASK) != 0) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index aa96f44..0b13f2d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -345,7 +345,7 @@ public abstract class DistributedCacheOperation { try { // Recipients with CacheOp - Set<InternalDistributedMember> recipients = new HashSet<>(getRecipients()); + Set<InternalDistributedMember> recipients = getRecipients(); Map<InternalDistributedMember, PersistentMemberID> persistentIds = null; if (region.getDataPolicy().withPersistence()) { persistentIds = region.getDistributionAdvisor().adviseInitializedPersistentMembers(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index d04882a..f13bce3 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2214,12 +2214,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute cacheProfile.hasCacheListener = hasListener(); Assert.assertTrue(scope.isDistributed()); cacheProfile.scope = scope; - - boolean newInRecovery = getImageState().getInRecovery(); - if (cacheProfile.getInRecovery() != newInRecovery) { - distAdvisor.incInRecoveryVersion(); - } - cacheProfile.setInRecovery(newInRecovery); + cacheProfile.inRecovery = getImageState().getInRecovery(); cacheProfile.isPersistent = getDataPolicy().withPersistence(); cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java deleted file mode 100644 index 010e658..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.Set; - -import org.junit.Before; -import org.junit.Test; - -import org.apache.geode.CancelCriterion; -import org.apache.geode.cache.DataPolicy; -import org.apache.geode.cache.Operation; -import org.apache.geode.distributed.internal.DistributionAdvisor; -import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; - - -public class CacheDistributionAdvisorTest { - DistributionAdvisor advisor; - - @Before - public void setUp() { - CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class); - DistributionManager distributionManager = mock(DistributionManager.class); - when(advisee.getDistributionManager()).thenReturn(distributionManager); - CancelCriterion cancelCriterion = mock(CancelCriterion.class); - when(advisee.getCancelCriterion()).thenReturn(cancelCriterion); - advisor = - CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee); - when(advisee.getDistributionAdvisor()).thenReturn(advisor); - - } - - @Test - public void testAdviseAllEventsOrCached() { - CacheProfile profile = mock(CacheProfile.class); - InternalDistributedMember member = mock(InternalDistributedMember.class); - when(profile.getId()).thenReturn(member); - when(profile.getInRecovery()).thenReturn(false); - when(profile.cachedOrAllEventsWithListener()).thenReturn(true); - when(profile.getDistributedMember()).thenReturn(member); - - advisor.putProfile(profile, true); - Set<InternalDistributedMember> targets1 = - ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); - - Set<InternalDistributedMember> targets2 = - ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); - - Set<InternalDistributedMember> targets3 = - ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); - - verify(profile, times(1)).getInRecovery(); - verify(profile, times(1)).cachedOrAllEventsWithListener(); - - } - - @Test - public void testAdviseAllEventsOrCached2() { - CacheProfile profile = mock(CacheProfile.class); - InternalDistributedMember member = mock(InternalDistributedMember.class); - when(profile.getId()).thenReturn(member); - when(profile.getInRecovery()).thenReturn(false); - when(profile.cachedOrAllEventsWithListener()).thenReturn(true); - when(profile.getDistributedMember()).thenReturn(member); - CacheProfile profile2 = mock(CacheProfile.class); - InternalDistributedMember member2 = mock(InternalDistributedMember.class); - when(profile2.getId()).thenReturn(member2); - when(profile2.getInRecovery()).thenReturn(false); - when(profile2.cachedOrAllEventsWithListener()).thenReturn(true); - when(profile2.getDistributedMember()).thenReturn(member2); - - advisor.putProfile(profile, true); - Set<InternalDistributedMember> targets1 = - ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); - - advisor.putProfile(profile2, true); - Set<InternalDistributedMember> targets2 = - ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); - - Set<InternalDistributedMember> targets3 = - ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached(); - - verify(profile, times(2)).getInRecovery(); - verify(profile, times(2)).cachedOrAllEventsWithListener(); - verify(profile2, times(1)).getInRecovery(); - verify(profile2, times(1)).cachedOrAllEventsWithListener(); - - } - - - @Test - public void testAdviseUpdate() { - CacheProfile profile = mock(CacheProfile.class); - InternalDistributedMember member = mock(InternalDistributedMember.class); - EntryEventImpl event = mock(EntryEventImpl.class); - when(event.hasNewValue()).thenReturn(false); - when(event.getOperation()).thenReturn(Operation.CREATE); - - when(profile.getId()).thenReturn(member); - when(profile.cachedOrAllEventsWithListener()).thenReturn(true); - when(profile.getDistributedMember()).thenReturn(member); - when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE); - - advisor.putProfile(profile, true); - Set<InternalDistributedMember> targets1 = - ((CacheDistributionAdvisor) advisor).adviseUpdate(event); - - Set<InternalDistributedMember> targets2 = - ((CacheDistributionAdvisor) advisor).adviseUpdate(event); - - Set<InternalDistributedMember> targets3 = - ((CacheDistributionAdvisor) advisor).adviseUpdate(event); - - verify(profile, times(1)).getDataPolicy(); - } - - @Test - public void testAdviseUpdate2() { - CacheProfile profile = mock(CacheProfile.class); - InternalDistributedMember member = mock(InternalDistributedMember.class); - EntryEventImpl event = mock(EntryEventImpl.class); - when(event.hasNewValue()).thenReturn(false); - when(event.getOperation()).thenReturn(Operation.CREATE); - - when(profile.getId()).thenReturn(member); - when(profile.cachedOrAllEventsWithListener()).thenReturn(true); - when(profile.getDistributedMember()).thenReturn(member); - when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE); - - CacheProfile profile2 = mock(CacheProfile.class); - InternalDistributedMember member2 = mock(InternalDistributedMember.class); - when(profile2.getId()).thenReturn(member2); - when(profile2.cachedOrAllEventsWithListener()).thenReturn(true); - when(profile2.getDistributedMember()).thenReturn(member2); - when(profile2.getDataPolicy()).thenReturn(DataPolicy.REPLICATE); - - when(event.hasNewValue()).thenReturn(false); - when(event.getOperation()).thenReturn(Operation.CREATE); - - advisor.putProfile(profile, true); - Set<InternalDistributedMember> targets1 = - ((CacheDistributionAdvisor) advisor).adviseUpdate(event); - - advisor.putProfile(profile2, true); - Set<InternalDistributedMember> targets2 = - ((CacheDistributionAdvisor) advisor).adviseUpdate(event); - - Set<InternalDistributedMember> targets3 = - ((CacheDistributionAdvisor) advisor).adviseUpdate(event); - - verify(profile, times(2)).getDataPolicy(); - verify(profile2, times(1)).getDataPolicy(); - - } - - -}